[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Feb 7 14:00:06 CST 2012
branch "master" has been updated
via 31946020d908b744f0969aae1704039e6b80b24a (commit)
from fff860e3c1b6e62994bfcc9eefbfe0c167316874 (commit)
Summary of changes:
config/test_bridging.conf | 4 +
src/BridgeImpl.cpp | 261 +++++++++++++++++++++----------
src/BridgeManagerImpl.cpp | 20 +--
src/BridgeManagerImpl.h | 2 +-
src/BridgeReplicatorStateListenerI.cpp | 116 ++++++++++----
src/Component.cpp | 14 ++-
src/MediaSplicer.cpp | 10 +-
src/SessionCollection.cpp | 22 +++
src/SessionCollection.h | 2 +
src/SessionListener.cpp | 6 +
src/SessionWrapper.cpp | 18 ++-
src/SessionWrapper.h | 2 +-
test/TestBridging.cpp | 88 +++++++-----
13 files changed, 393 insertions(+), 172 deletions(-)
- Log -----------------------------------------------------------------
commit 31946020d908b744f0969aae1704039e6b80b24a
Author: Brent Eagles <beagles at digium.com>
Date: Tue Feb 7 16:27:38 2012 -0330
- Fix up some logging messages.
- Null handle checks in some of the tasks (exceptions were showing up in
the test suite.
- Quick hack for setting log level by confiugration.. the component's
logger isn't being used, instead the file scoped copy is.. .wrong! Still,
didn't change that.. enough unrelated changes made in this commit already.
- A lot of stuff was not calling removeState when it was going away.
- The test suite doesn't really do replication correctly.. which might be
part of the problem.
diff --git a/config/test_bridging.conf b/config/test_bridging.conf
index 3b9e504..18f8235 100644
--- a/config/test_bridging.conf
+++ b/config/test_bridging.conf
@@ -82,6 +82,7 @@ TestBridge.ServiceAdapter.ThreadPool.Size=2
TestBridge.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57001
TestBridge.BackplaneAdapter.ThreadPool.Size=2
TestBridge.Standby=false
+TestBridge.LogLevel=0
TestBridge2.InstanceName=TestBridge2
TestBridge2.BridgeManagerObjectId=TestBridgeManager2
@@ -90,6 +91,9 @@ TestBridge2.ServiceAdapter.Endpoints=default -h 127.0.0.1 -p 57010
TestBridge2.ServiceAdapter.ThreadPool.Size=4
TestBridge2.BackplaneAdapter.Endpoints=default -h 127.0.0.1 -p 57011
TestBridge2.BackplaneAdapter.ThreadPool.Size=4
+TestBridge2.LogLevel=0
+#Ice.Trace.Network=1
+#Ice.Trace.Protocol=1
#
# Configuration for the bridge state replicator.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 1a88527..765ae55 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -304,22 +304,34 @@ public:
protected:
bool executeImpl()
{
+ mLogger(Trace) << FUNLOG;
+ if (!mSourceSession)
+ {
+ mLogger(Trace) << FUNLOG << " Source session is nil, returning early with true";
+ return true;
+ }
try
{
- mLogger(Trace) << FUNLOG;
// Forward the ConnectedLine to each bridged session.
SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
for(SessionSeq::iterator i = sessions.begin();
i != sessions.end(); ++i)
{
- if ((*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
+ if (!(*i) || (*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
{
continue;
}
SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
- forward(destSessionWrapper, mRedirections);
+ if (destSessionWrapper)
+ {
+ forward(destSessionWrapper, mRedirections);
+ }
+ else
+ {
+ mLogger(Debug) << " Destination wrapper for session does not exist, continuing";
+ }
}
}
@@ -339,31 +351,37 @@ protected:
RedirectionsPtr destSpecificRedirections = redirections;
PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
-
- // Allow the ForwardingRedirectionsPartyId hooks to alter the Redirections record.
- for(vector<ForwardingRedirectionsPartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingRedirectionsHooks.begin();
- i != partyIdHooks->forwardingRedirectionsHooks.end(); ++i)
+ if (partyIdHooks)
{
- try
+ // Allow the ForwardingRedirectionsPartyId hooks to alter the Redirections record.
+ for(vector<ForwardingRedirectionsPartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingRedirectionsHooks.begin();
+ i != partyIdHooks->forwardingRedirectionsHooks.end(); ++i)
{
- // Apply this hook.
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(mSourceSession,
- destinationSession->getSession(),
- currentRedirections, destSpecificRedirections);
-
- if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ try
{
- currentRedirections = destSpecificRedirections;
+ // Apply this hook.
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingRedirections(mSourceSession,
+ destinationSession->getSession(),
+ currentRedirections, destSpecificRedirections);
+
+ if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ {
+ currentRedirections = destSpecificRedirections;
+ }
+ }
+ catch(const std::exception& e)
+ {
+ mLogger(Warning) << FUNLOG << " : " << e.what();
}
- }
- catch(const std::exception& e)
- {
- mLogger(Warning) << FUNLOG << " : " << e.what();
}
}
- // Forward the info via the SessionController for this session.
- destinationSession->getSessionController()->updateRedirections(currentRedirections);
+ SessionControllerPrx sessionController = destinationSession->getSessionController();
+ if (sessionController)
+ {
+ // Forward the info via the SessionController for this session.
+ sessionController->updateRedirections(currentRedirections);
+ }
}
private:
@@ -401,10 +419,14 @@ public:
protected:
bool executeImpl()
{
+ mLogger(Trace) << FUNLOG;
+ if (!mSourceSession)
+ {
+ mLogger(Trace) << FUNLOG << " Source session is nil, returning early with true";
+ return true;
+ }
try
{
- mLogger(Trace) << FUNLOG;
-
ConnectedLinePtr currentConnectedLine;
SessionWrapperPtr sourceWrapper = mBridge->getSessions()->getSession(mSourceSession);
bool isSet = sourceWrapper->getConnectedLine(currentConnectedLine);
@@ -420,13 +442,20 @@ protected:
for(SessionSeq::iterator i = sessions.begin();
i != sessions.end(); ++i)
{
- if ((*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
+ if (!(*i) || (*i)->ice_getIdentity() == mSourceSession->ice_getIdentity())
{
continue;
}
SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
- forward(destSessionWrapper, currentConnectedLine);
+ if (destSessionWrapper)
+ {
+ forward(destSessionWrapper, currentConnectedLine);
+ }
+ else
+ {
+ mLogger(Debug) << " Destination wrapper for session does not exist, continuing";
+ }
}
}
@@ -447,30 +476,37 @@ protected:
PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
- // Allow the ForwardingConnectedLinePartyId hooks to alter the ConnectedLine record.
- for(vector<ForwardingConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingConnectedLineHooks.begin();
- i != partyIdHooks->forwardingConnectedLineHooks.end(); ++i)
+ if (partyIdHooks)
{
- try
+ // Allow the ForwardingConnectedLinePartyId hooks to alter the ConnectedLine record.
+ for(vector<ForwardingConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingConnectedLineHooks.begin();
+ i != partyIdHooks->forwardingConnectedLineHooks.end(); ++i)
{
- // Apply a hook
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(mSourceSession,
- destinationSession->getSession(),
- currentConnectedLine, destSpecificConnectedLine);
-
- if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ try
{
- currentConnectedLine = destSpecificConnectedLine;
+ // Apply a hook
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingConnectedLine(mSourceSession,
+ destinationSession->getSession(),
+ currentConnectedLine, destSpecificConnectedLine);
+
+ if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ {
+ currentConnectedLine = destSpecificConnectedLine;
+ }
+ }
+ catch (const std::exception& e)
+ {
+ mLogger(Debug) << FUNLOG << " : " << e.what();
}
- }
- catch (const std::exception& e)
- {
- mLogger(Debug) << FUNLOG << " : " << e.what();
}
}
- // Forward the info via the SessionController for this session.
- destinationSession->getSessionController()->updateConnectedLine(currentConnectedLine);
+ SessionControllerPrx sessionController = destinationSession->getSessionController();
+ if (sessionController)
+ {
+ // Forward the info via the SessionController for this session.
+ sessionController->updateConnectedLine(currentConnectedLine);
+ }
}
private:
@@ -497,22 +533,35 @@ public:
protected:
bool executeImpl()
{
+ mLogger(Trace) << FUNLOG;
+ if (!mSource)
+ {
+ mLogger(Trace) << FUNLOG << " Source session is nil, returning early with true";
+ return true;
+ }
+
try
{
- mLogger(Trace) << FUNLOG;
// Forward the Caller to each bridged session.
SessionSeq sessions = mBridge->getSessions()->getSessionSeq();
for(SessionSeq::iterator i = sessions.begin();
i != sessions.end(); ++i)
{
- if ((*i)->ice_getIdentity() == mSource->ice_getIdentity())
+ if (!(*i) || (*i)->ice_getIdentity() == mSource->ice_getIdentity())
{
continue;
}
SessionWrapperPtr destSessionWrapper = mBridge->getSessions()->getSession(*i);
- forward(destSessionWrapper, mCallerID);
+ if (destSessionWrapper)
+ {
+ forward(destSessionWrapper, mCallerID);
+ }
+ else
+ {
+ mLogger(Debug) << " Destination wrapper for session does not exist, continuing";
+ }
}
}
@@ -524,7 +573,7 @@ protected:
return true;
}
- void forward(const SessionWrapperPtr destinationSession, const CallerPtr& callerID)
+ void forward(const SessionWrapperPtr& destinationSession, const CallerPtr& callerID)
{
mLogger(Trace) << FUNLOG;
@@ -533,30 +582,38 @@ protected:
PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
- // Allow the ForwardingConnectedLinePartyId hooks to alter the ConnectedLine record.
- for(vector<ForwardingCallerPartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingCallerHooks.begin();
- i != partyIdHooks->forwardingCallerHooks.end(); ++i)
+ if (partyIdHooks)
{
- try
+ // Allow the ForwardingConnectedLinePartyId hooks to alter the ConnectedLine record.
+ for(vector<ForwardingCallerPartyIdHookPrx>::const_iterator i = partyIdHooks->forwardingCallerHooks.begin();
+ i != partyIdHooks->forwardingCallerHooks.end(); ++i)
{
- // Apply a hook
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(mSource,
- destinationSession->getSession(),
- currentCallerID, destSpecificCallerID);
-
- if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ try
{
- currentCallerID = destSpecificCallerID;
+ // Apply a hook
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyForwardingCaller(mSource,
+ destinationSession->getSession(),
+ currentCallerID, destSpecificCallerID);
+
+ if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ {
+ currentCallerID = destSpecificCallerID;
+ }
+ }
+ catch (const std::exception& e)
+ {
+ mLogger(Debug) << FUNLOG << " : " << e.what();
}
- }
- catch (const std::exception& e)
- {
- mLogger(Debug) << FUNLOG << " : " << e.what();
}
}
-
+ //
// Forward the info via the SessionController for this session.
- destinationSession->getSessionController()->updateCallerID(currentCallerID);
+ //
+ SessionControllerPrx sessionController = destinationSession->getSessionController();
+ if (sessionController)
+ {
+ sessionController->updateCallerID(currentCallerID);
+ }
}
private:
@@ -589,31 +646,43 @@ protected:
bool executeImpl()
{
mLogger(Trace) << FUNLOG;
+ if (!mSourceSession)
+ {
+ mLogger(Trace) << FUNLOG << "Source session is nil, returning early with true.";
+ return true;
+ }
ConnectedLinePtr currentConnectedLine = mConnectedLine;
ConnectedLinePtr updatedConnectedLine = mConnectedLine;
SessionWrapperPtr wrapper = mBridge->getSessions()->getSession(mSourceSession);
- PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
+ if (!wrapper)
+ {
+ mLogger(Debug) << "Unable to find matching session for, returning early with true.";
+ }
- // Allow the ReceivedConnectedLinePartyId hooks to alter the ConnectedLine record.
- for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->receivedConnectedLineHooks.begin();
- i != partyIdHooks->receivedConnectedLineHooks.end(); ++i)
+ PartyIdHooksPtr partyIdHooks = mBridge->getPartyIdHooks();
+ if (partyIdHooks)
{
- try
+ // Allow the ReceivedConnectedLinePartyId hooks to alter the ConnectedLine record.
+ for(vector<ReceivedConnectedLinePartyIdHookPrx>::const_iterator i = partyIdHooks->receivedConnectedLineHooks.begin();
+ i != partyIdHooks->receivedConnectedLineHooks.end(); ++i)
{
- // Apply this hook.
- AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(mSourceSession,
- currentConnectedLine, updatedConnectedLine);
-
- if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ try
{
- currentConnectedLine = updatedConnectedLine;
+ // Apply this hook.
+ AsteriskSCF::System::Hook::V1::HookResult hookResult = (*i)->modifyReceivedConnectedLine(mSourceSession,
+ currentConnectedLine, updatedConnectedLine);
+
+ if (hookResult.status == AsteriskSCF::System::Hook::V1::Succeeded)
+ {
+ currentConnectedLine = updatedConnectedLine;
+ }
+ }
+ catch (const std::exception& e)
+ {
+ mLogger(Debug) << FUNLOG << " : " << e.what();
}
- }
- catch (const std::exception& e)
- {
- mLogger(Debug) << FUNLOG << " : " << e.what();
}
}
@@ -1544,7 +1613,6 @@ void BridgeImpl::shutdown(const Ice::Current& current)
update = createUpdate();
}
pushUpdate(update);
- update = 0;
{
//
// Currently the slice defines the response "Normal Clearing" as the default
@@ -1560,12 +1628,24 @@ void BridgeImpl::shutdown(const Ice::Current& current)
//
// Remove references to the session listener implementation.
//
- update = createUpdate();
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
mSessionListener = 0;
}
+
+ try
+ {
+ if (replicate())
+ {
+ Ice::StringSeq keys;
+ keys.push_back(mState->key);
+ mReplicator->removeState(keys);
+ }
+ }
+ catch (const Ice::Exception&)
+ {
+ }
+
mSessions = 0;
- pushUpdate(update);
}
void BridgeImpl::destroy(const Ice::Current& current)
@@ -1596,6 +1676,19 @@ void BridgeImpl::destroy(const Ice::Current& current)
// Remove references to the session listener implementation.
//
mObjAdapter->remove(mSessionListenerPrx->ice_getIdentity());
+
+ try
+ {
+ if (replicate())
+ {
+ Ice::StringSeq keys;
+ keys.push_back(mState->key);
+ mReplicator->removeState(keys);
+ }
+ }
+ catch (const Ice::Exception&)
+ {
+ }
}
void BridgeImpl::addListener(const BridgeListenerPrx& listener, const Ice::Current& current)
@@ -1634,7 +1727,7 @@ void BridgeImpl::removeListener(const BridgeListenerPrx& listener, const Ice::Cu
}
if (mListeners->removeListener(listener))
{
- string key = mState->key + ".listener." +
+ string key = mState->bridgeId + ".listener." +
mObjAdapter->getCommunicator()->identityToString(listener->ice_getIdentity());
if (replicate())
{
@@ -1993,7 +2086,7 @@ BridgeStateItemPtr BridgeImpl::createUpdate()
//
if (replicate())
{
- BridgeStateItemPtr result = new BridgeStateItem(*mState.get());
+ BridgeStateItemPtr result = BridgeStateItemPtr::dynamicCast(mState->ice_clone());
return result;
}
return 0;
@@ -2058,7 +2151,7 @@ AsteriskSCF::BridgeService::BridgeServant::create(const Ice::ObjectAdapterPtr& o
logger(Trace) << FUNLOG << ": creating replica for " << state->bridgeId;
IceUtil::Handle<AsteriskSCF::BridgeService::BridgeServant> bridge(
new BridgeImpl(state->bridgeId, objectAdapter, vector<BridgeListenerPrx>(),
- listenerMgr, replicator, state, logger));
+ listenerMgr, replicator, BridgeStateItemPtr::dynamicCast(state->ice_clone()), logger));
return bridge;
}
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 9d2c83f..a12c94c 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -46,6 +46,10 @@ namespace
#ifndef _NDEBUG
void dump(const Logger& logger, const BridgeCreationHookDataPtr& hookData)
{
+ //
+ // TODO: this whole thing should not be run if running at the "Trace"
+ // log level.
+ //
ostringstream os;
os << "Hook data:";
string indent = " ";
@@ -67,7 +71,7 @@ void dump(const Logger& logger, const BridgeCreationHookDataPtr& hookData)
{
os << prefix << indent << indent << (*i);
}
- logger(Debug) << os.str();
+ logger(Trace) << os.str();
}
#else
void dump(const Logger&, const BridgeCreationHookDataPtr&)
@@ -124,7 +128,7 @@ public:
void createBridgeReplica(const BridgeStateItemPtr& bridgeState);
- void removeBridge(const BridgeStateItemPtr& bridgeState);
+ void removeBridge(const string& bridgeState);
private:
@@ -484,13 +488,7 @@ void BridgeManagerImpl::updateState(const BridgeManagerStateItemPtr& state)
{
mLogger(Trace) << FUNLOG;
boost::unique_lock<boost::shared_mutex> lock(mLock);
- //
- // We perform a deep copy because there are no guarantees about the thread safety of the memory
- // pointed to be "state" over time. We could "say" that the call acquires ownership, but its
- // safer to take the added cost of the copy.
- //
- *mState = *state;
-
+ mState = BridgeManagerStateItemPtr::dynamicCast(state->ice_clone());
mPartyIdExtensionPoint->replaceHooks(state->partyIdExtensionPointHooks);
}
@@ -561,13 +559,13 @@ void BridgeManagerImpl::createBridgeReplica(const BridgeStateItemPtr& state)
mBridges.push_back(info);
}
-void BridgeManagerImpl::removeBridge(const BridgeStateItemPtr& state)
+void BridgeManagerImpl::removeBridge(const string& bridgeId)
{
mLogger(Trace) << FUNLOG;
boost::unique_lock<boost::shared_mutex> lock(mLock);
for (vector<BridgeInfo>::iterator i = mBridges.begin(); i != mBridges.end(); ++i)
{
- if (i->servant->id() == state->bridgeId)
+ if (i->servant->id() == bridgeId)
{
i->servant->destroyImpl();
mBridges.erase(i);
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index ff2f6ae..ab2b6aa 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -56,7 +56,7 @@ public:
virtual void createBridgeReplica(const AsteriskSCF::Replication::BridgeService::V1::BridgeStateItemPtr& bridgeState) = 0;
- virtual void removeBridge(const AsteriskSCF::Replication::BridgeService::V1::BridgeStateItemPtr& bridgeState) = 0;
+ virtual void removeBridge(const std::string& bridgeId) = 0;
};
typedef IceUtil::Handle<BridgeManagerServant> BridgeManagerServantPtr;
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index fcbd5bf..97ebf96 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -41,18 +41,40 @@ public:
{
}
+ void removeBridgeItem(const string& bridgeKey, const string& item, const string& sliceType)
+ {
+ IceUtil::Mutex::Lock lock(mMutex);
+ mLogger(Trace) << " removing bridge item id " << item << " (" << sliceType << ") from bridge " << bridgeKey;
+ mBridgeItemMap[bridgeKey].erase(item);
+ if (mBridgeItemMap[bridgeKey].empty())
+ {
+ map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find(bridgeKey);
+ if (entry == mItems.end())
+ {
+ mLogger(Trace) << "no items left, and the bridge itself is no longer in item dictionary, it is safe to remove the bridge";
+ mManager->removeBridge(bridgeKey);
+ }
+ }
+ }
+
void stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current& current)
{
for (Ice::StringSeq::const_iterator k = itemKeys.begin(); k != itemKeys.end(); ++k)
{
-
- map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*k));
- if (entry != mItems.end())
+ ReplicatedStateItemPtr item;
+ {
+ IceUtil::Mutex::Lock lock(mMutex);
+ map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*k));
+ if (entry != mItems.end())
+ {
+ item = entry->second;
+ mItems.erase(entry);
+ }
+ }
+ if (item)
{
- ReplicatedStateItemPtr item = entry->second;
mLogger(Trace) << " received removal of " << (*k) << ": a " << item->ice_id();
- mItems.erase(entry);
BridgedSessionPtr bridgedSessionItem = BridgedSessionPtr::dynamicCast(item);
if (bridgedSessionItem)
{
@@ -68,6 +90,7 @@ public:
// Keep the session list clean.
//
found = true;
+ removeBridgeItem(bridgedSessionItem->bridgeId, bridgedSessionItem->key, item->ice_id());
}
//
// We could break here if we could be sure that there were no other updates.
@@ -85,6 +108,7 @@ public:
if ((*b) && (*b)->id() == bridgeListener->bridgeId)
{
(*b)->removeListener(bridgeListener);
+ removeBridgeItem(bridgeListener->bridgeId, bridgeListener->key, item->ice_id());
}
//
// We could break here if we could be sure that there were no other updates.
@@ -100,13 +124,23 @@ public:
{
dumpState(cerr, bridgeItem, current.adapter->getCommunicator());
}
- mManager->removeBridge(bridgeItem);
+ if (mBridgeItemMap[bridgeItem->bridgeId].empty())
+ {
+ mManager->removeBridge(bridgeItem->bridgeId);
+ }
continue;
}
//
// Session pairings are cleaned up by way of sessions going away.
//
+ SessionPairingPtr pairing = SessionPairingPtr::dynamicCast(item);
+ if (pairing)
+ {
+ removeBridgeItem(pairing->bridgeKey, pairing->key, item->ice_id());
+ continue;
+ }
+
///
// The bridge manager isn't really removable.
@@ -121,30 +155,34 @@ public:
{
mLogger(Trace) << " received update " << (*i)->serial << " for " << (*i)->key << " (a " <<
(*i)->ice_id() << ")";
- map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*i)->key);
ReplicatedStateItemPtr existingItem;
- if (entry != mItems.end())
{
- //
- // Note: Another serial number situation to consider is two updates of the same key with
- // the same serial number.
- //
+ IceUtil::Mutex::Lock lock(mMutex);
+ map<string, ReplicatedStateItemPtr>::iterator entry = mItems.find((*i)->key);
+ if (entry != mItems.end())
+ {
+ //
+ // Note: Another serial number situation to consider is two updates of the same key with
+ // the same serial number.
+ //
- //
- // Look at serial numbers and indicate an out of sequence update. We should
- // ignore out of sequence updates.
- //
- if ((entry->second->serial > (*i)->serial) && (*i)->serial > SerialCounterStart)
+ //
+ // Look at serial numbers and indicate an out of sequence update. We should
+ // ignore out of sequence updates.
+ //
+ if ((entry->second->serial > (*i)->serial) && (*i)->serial > SerialCounterStart)
+ {
+ mLogger(Error) << "Update serial number for " << (*i)->key << " out of sequence! " <<
+ (*i)->serial << " updating " << entry->second->serial;
+ continue;
+ }
+ existingItem = entry->second;
+ entry->second = (*i);
+ }
+ else
{
- mLogger(Error) << "Update serial number for " << (*i)->key << " out of sequence! " <<
- (*i)->serial << " updating " << entry->second->serial;
- continue;
+ mItems[(*i)->key] = *i;
}
- existingItem = entry->second;
- }
- else
- {
- mItems[(*i)->key] = *i;
}
BridgeManagerStateItemPtr managerItem = BridgeManagerStateItemPtr::dynamicCast((*i));
@@ -181,13 +219,16 @@ public:
// We could break here if we could be sure that there were no other updates.
//
}
-
- if (!found)
+ //
+ // If a race condition occurs on state updates and a bridge and its replica objects are in the process of
+ // shutting down, we need to avoid attempting a recreation of a replica.
+ //
+ if (!found && (bridgeItem->runningState != ShuttingDown && bridgeItem->runningState != Destroyed))
{
if (existingItem)
{
- mLogger(Error) << "Replica listener has a bridge object that the bridge manager "
+ mLogger(Warning) << "Replica listener has a bridge object that the bridge manager "
"does not know about. This likely indicates an error and should be investigated.";
}
mManager->createBridgeReplica(bridgeItem);
@@ -200,6 +241,7 @@ public:
{
vector<BridgeServantPtr> bridges = mManager->getBridges();
bool found = false;
+ string unique = IceUtil::generateUUID();
for (vector<BridgeServantPtr>::iterator b = bridges.begin(); b != bridges.end(); ++b)
{
if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
@@ -210,6 +252,7 @@ public:
// Keep the session list clean.
//
found = true;
+ mBridgeItemMap[bridgedSessionItem->bridgeId][bridgedSessionItem->key] = bridgedSessionItem;
}
//
// We could break here if we could be sure that there were no other updates.
@@ -217,7 +260,8 @@ public:
}
if (!found)
{
- mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ mLogger(Warning) << "received a " << bridgedSessionItem->ice_id() << " (id: " << bridgedSessionItem->key << ")"
+ << " update for a session on a bridge that does not exist: " << bridgedSessionItem->bridgeId;
}
continue;
@@ -241,6 +285,7 @@ public:
// Keep the session list clean.
//
found = true;
+ mBridgeItemMap[sessionPairing->bridgeKey][sessionPairing->key] = sessionPairing;
}
//
// We could break here if we could be sure that there were no other updates.
@@ -248,7 +293,8 @@ public:
}
if (!found)
{
- mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ mLogger(Warning) << "received a " << sessionPairing->ice_id() << " (id: " << sessionPairing->key << ")"
+ << " update for a session on a bridge that does not exist: " << sessionPairing->bridgeKey;
}
continue;
@@ -264,6 +310,8 @@ public:
if ((*b) && (*b)->id() == bridgeListener->bridgeId)
{
(*b)->addListener(bridgeListener);
+ mBridgeItemMap[bridgeListener->bridgeId][bridgeListener->key] = bridgeListener;
+ found = true;
}
//
// We could break here if we could be sure that there were no other updates.
@@ -271,21 +319,23 @@ public:
}
if (!found)
{
- mLogger(Error) << "received an update for a session on a bridge that does not exist!";
+ mLogger(Warning) << "received a " << bridgeListener->ice_id() << " (id: " << bridgeListener->key << ")"
+ << " update for a session on a bridge that does not exist: " << bridgeListener->bridgeId;
}
continue;
}
- mLogger(Info) << "Bridge replicator service received an unrecognized replication item.";
+ mLogger(Error) << "Bridge replicator service received an unrecognized replication item: " << (*i)->ice_id();
}
}
private:
-
+ IceUtil::Mutex mMutex;
map<string, ReplicatedStateItemPtr> mItems;
Logger mLogger;
BridgeManagerServantPtr mManager;
+ map<string, map<string, ReplicatedStateItemPtr> > mBridgeItemMap;
};
}
diff --git a/src/Component.cpp b/src/Component.cpp
index 3c1de74..3e875f8 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -57,7 +57,9 @@ public:
Component() :
AsteriskSCF::Component::Component(lg,
"BridgeService"),
- mListeningToReplicator(false) {}
+ mListeningToReplicator(false)
+ {
+ }
private:
// Optional base Component notification overrides.
@@ -120,6 +122,16 @@ void Component::onActivated()
void Component::onPreInitialize()
{
lg(Debug) << "Launching AsteriskSCF Session-Oriented Bridging Service " << getName();
+ Ice::Int logLevel = getCommunicator()->getProperties()->getPropertyAsIntWithDefault(getName() + ".LogLevel", static_cast<Ice::Int>(Debug));
+ if (logLevel >= 0 && logLevel <= static_cast<Ice::Int>(AsteriskSCF::System::Logging::Off))
+ {
+ lg.setLevel(static_cast<AsteriskSCF::System::Logging::Level>(logLevel));
+ mLogger.setLevel(static_cast<AsteriskSCF::System::Logging::Level>(logLevel));
+ }
+ else
+ {
+ mLogger(Error) << "Configuration attempted to set log level to an invalid value.";
+ }
}
/**
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 0a90b3f..6d69be0 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -233,7 +233,6 @@ public:
//
void destroy()
{
- SessionPairingPtr newState;
vector<OutgoingPairing> outgoing;
vector<IncomingPairing> incoming;
{
@@ -243,9 +242,14 @@ public:
incoming = mIncoming;
mIncoming.clear();
mConnected = false;
- newState = createUpdate();
}
- pushUpdate(newState);
+ if (mReplicator)
+ {
+ vector<string> keys;
+ keys.push_back(mKey);
+ mReplicator->removeState(keys);
+ mReplicator = AsteriskSCF::BridgeService::ReplicatorSmartPrx();
+ }
mLogger(Trace) << FUNLOG << ": unplugging sinks and sources";
//
diff --git a/src/SessionCollection.cpp b/src/SessionCollection.cpp
index bd3f1ff..9b6c815 100644
--- a/src/SessionCollection.cpp
+++ b/src/SessionCollection.cpp
@@ -202,8 +202,11 @@ void SessionCollection::replicaUpdate(const BridgedSessionPtr& session)
void SessionCollection::removeSession(const BridgedSessionPtr& session)
{
+ SessionWrapperPtr removedSession;
+ {
boost::unique_lock<boost::shared_mutex> lock(mLock);
SessionMap::iterator i = mMap.find(session->key);
+ removedSession = i->second;
if (i != mMap.end())
{
mMap.erase(i);
@@ -214,4 +217,23 @@ void SessionCollection::removeSession(const BridgedSessionPtr& session)
{
mSplicer->disableMixing();
}
+ }
+ if (removedSession)
+ {
+ removedSession->destroy();
+ }
+}
+
+void SessionCollection::destroy()
+{
+ SessionMap copy;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ copy = mMap;
+ mMap.clear();
+ }
+ for (SessionMap::iterator i = copy.begin(); i != copy.end();)
+ {
+ i->second->destroy();
+ }
}
diff --git a/src/SessionCollection.h b/src/SessionCollection.h
index a9c0264..16d8649 100644
--- a/src/SessionCollection.h
+++ b/src/SessionCollection.h
@@ -132,6 +132,8 @@ public:
void replicaUpdate(const AsteriskSCF::Replication::BridgeService::V1::BridgedSessionPtr& bridgedSession);
void removeSession(const AsteriskSCF::Replication::BridgeService::V1::BridgedSessionPtr& bridgedSession);
+
+ void destroy();
private:
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index c6571e3..21778b2 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -217,6 +217,12 @@ public:
mBridgePrx->shutdown();
}
}
+ catch (const Ice::ObjectNotExistException&)
+ {
+ //
+ // The bridge has already gone away... this is okay.
+ //
+ }
catch (const Ice::Exception& ex)
{
mLogger(Error) << "Unexpected exception when initiating auto shutdown: " << ex.what();
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 480a7e8..5996a76 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -491,6 +491,8 @@ void SessionWrapper::setupMedia()
// basic method. If we need to prevent this from recurring, then it needs
// to be in the caller's logic, not here.
//
+ // TODO: State check
+ //
mConnector = 0;
mSplicer->connect(this, mSession->session);
}
@@ -500,6 +502,9 @@ void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
mLogger(Trace) << FUNLOG << " for " << mId;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ //
+ // TODO: state check?
+ //
mConnector = connector;
}
}
@@ -509,6 +514,9 @@ void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
mLogger(Trace) << FUNLOG << " for " << mId;
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ //
+ // TODO: shouldn't the state be checked here?
+ //
mSplicer->update(pairings);
if (mConnector)
@@ -567,10 +575,15 @@ void SessionWrapper::destroy()
//
unplugMedia();
mSession->currentState = Done;
- newState = createUpdate();
}
}
- pushUpdate(newState);
+ if (mReplicator)
+ {
+ vector<string> keys;
+ keys.push_back(mSession->key);
+ mReplicator->removeState(keys);
+ mReplicator = AsteriskSCF::BridgeService::ReplicatorSmartPrx();
+ }
}
bool SessionWrapper::isDestroyed()
@@ -644,7 +657,6 @@ bool SessionWrapper::getConnectedLine(ConnectedLinePtr& connectedLine)
return true;
}
-
AsteriskSCF::Replication::BridgeService::V1::BridgedSessionState SessionWrapper::setState(const AsteriskSCF::Replication::BridgeService::V1::BridgedSessionState newState)
{
mLogger(Trace) << FUNLOG << ": updating state " << mId;
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index c11a07b..b03acc4 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -158,7 +158,7 @@ public:
* @return true if the connectedLine parameter was set.
*/
bool getConnectedLine(AsteriskSCF::SessionCommunications::PartyIdentification::V1::ConnectedLinePtr& connectedLine);
-
+
private:
mutable boost::shared_mutex mLock;
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index 75d3f23..69410c3 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -335,6 +335,42 @@ typedef IceUtil::Handle<TestEnvironment> TestEnvironmentPtr;
TestEnvironmentPtr globalTestEnvironment;
//
+// Some tests leave cruft behind if they fail. This helper function cleans out any old bridge
+// objects that may be laying around so we can rely on a certain baseline.
+//
+void cleanupBridges(const AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx& mgr)
+{
+ BridgeSeq bridges = mgr->listBridges();
+ for (BridgeSeq::const_iterator iter = bridges.begin(); iter != bridges.end(); ++iter)
+ {
+ try
+ {
+ (*iter)->shutdown();
+ }
+ catch (const Ice::ObjectNotExistException&)
+ {
+ //
+ // Good!
+ //
+ }
+ catch (const exception& ex)
+ {
+ //
+ // Not great.
+ //
+ cerr << ex.what() << " thrown when cleaning up bridges";
+ }
+ catch (...)
+ {
+ //
+ // Even less great
+ //
+ cerr << "Unknown exception thrown when cleaning up bridges";
+ }
+ }
+}
+
+//
// Simple helper for initializing Ice and ensuring proper cleanup in the context of a test case.
//
class IceEnvironment
@@ -489,6 +525,7 @@ public:
addServant(listenerPrx, testAdapter, servant, testEnv.strToIdent(IceUtil::generateUUID()));
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
@@ -532,6 +569,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -639,6 +677,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -736,6 +775,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -850,6 +890,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -906,6 +947,7 @@ public:
testAdapter->activate();
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
AsteriskSCF::SessionCommunications::V1::SessionSeq sessions;
AsteriskSCF::SessionCommunications::V1::SessionPrx a = channel.getSession("111");
a->start();
@@ -990,6 +1032,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx2 = env()->standbyBridgeManager();
BOOST_CHECK(mgrPrx2);
@@ -1021,39 +1064,6 @@ public:
BOOST_CHECK(servant->createCalls() == 2);
bridge->shutdown();
mgrPrx->removeListener(listenerPrx);
-
- BridgeSeq bridges = mgrPrx->listBridges();
-
- // Activate the secondary component so we can
- // query it.
- ReplicaPrx secondaryReplica = env()->secondaryReplicaControl();
- ReplicaPrx primaryReplica = env()->primaryReplicaControl();
- BOOST_CHECK(primaryReplica->isActive() == true);
-
- primaryReplica->standby();
- BOOST_CHECK(primaryReplica->isActive() == false);
-
- secondaryReplica->activate();
- BOOST_CHECK(secondaryReplica->isActive() == true);
-
- BridgeSeq bridges2 = mgrPrx2->listBridges();
- if (bridges.size() != bridges2.size())
- {
- stringstream os;
- os << __FILE__ << ':' << __LINE__ << " Bridge count differs, primary: " << bridges.size() << " vs: "
- << bridges2.size();
- BOOST_MESSAGE(os.str());
- }
-
- BOOST_CHECK(bridges.size() == bridges2.size());
-
- // Set the components back to original state.
- secondaryReplica->standby();
- primaryReplica->activate();
-
- BOOST_CHECK(primaryReplica->isActive() == true);
- BOOST_CHECK(secondaryReplica->isActive() == false);
-
}
catch (const Ice::Exception& ex)
{
@@ -1094,6 +1104,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -1197,6 +1208,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -1304,6 +1316,7 @@ public:
AsteriskSCF::SessionCommunications::V1::BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
BOOST_CHECK(servant->stoppingCalls() == 0);
BOOST_CHECK(servant->stoppedCalls() == 0);
@@ -1409,6 +1422,7 @@ public:
BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
BOOST_CHECK(mgrPrx);
+ cleanupBridges(mgrPrx);
mgrPrx->addListener(listenerPrx);
SessionSeq sessions;
@@ -1493,6 +1507,10 @@ public:
BOOST_FAIL("Unexpected exception");
}
}
+ catch (const Ice::Exception& ex)
+ {
+ BOOST_FAIL(ex.what());
+ }
catch (...)
{
BOOST_FAIL("Unexpected exception");
@@ -1509,12 +1527,12 @@ bool init_unit_test()
{
boost::shared_ptr<BridgeTester> bridgeTester(new BridgeTester(globalTestEnvironment));
framework::master_test_suite().
- add(BOOST_TEST_CASE(boost::bind(&BridgeTester::telephonyConnectTest, bridgeTester)));
- framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::createEmptyBridge, bridgeTester)));
framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::simpleBridgingTest, bridgeTester)));
framework::master_test_suite().
+ add(BOOST_TEST_CASE(boost::bind(&BridgeTester::telephonyConnectTest, bridgeTester)));
+ framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::testAutoShutdown, bridgeTester)));
framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::bridgeDefaultListenerTest, bridgeTester)));
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list