[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "single-build-dir" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 26 09:05:29 CDT 2011
branch "single-build-dir" has been created
at ba42f30351e094f5fe76b24a496d7e1baa3a11a6 (commit)
- Log -----------------------------------------------------------------
commit ba42f30351e094f5fe76b24a496d7e1baa3a11a6
Author: Kevin P. Fleming <kpfleming at digium.com>
Date: Mon Apr 25 17:49:26 2011 -0500
Changes to work with new single-build-directory CMake script.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9979a32..e8432c0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -16,9 +16,6 @@ if(NOT integrated_build STREQUAL "true")
# Include common AsteriskSCF build infrastructure
include(cmake/AsteriskSCF.cmake)
- # This project is C++ based and requires a minimum of 3.4
- asterisk_scf_project(BridgingService 3.4 CXX)
-
# Take care of slice definitions
add_subdirectory(slice)
@@ -27,6 +24,8 @@ if(NOT integrated_build STREQUAL "true")
add_subdirectory(logger)
endif()
+asterisk_scf_project(BridgingService 3.4)
+
add_subdirectory(src)
add_subdirectory(config)
add_subdirectory(test)
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index 37d7b8c..8aa0cf7 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -108,13 +108,13 @@ ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
#
# The IceBox entries for loading the services.
#
-IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
-IceBox.Service.Replicator=../src at BridgeReplicator:create
-IceBox.Service.TestBridge=../src at bridgeservice:create
-IceBox.Service.TestBridge2=../src/@bridgeservice:create
-IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
-IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridge_component_test:create
+IceBox.Service.Logger=logging-service:createLoggingService
+IceBox.Service.Replicator=BridgeReplicator:create
+IceBox.Service.TestBridge=bridgeservice:create
+IceBox.Service.TestBridge2=bridgeservice:create
+IceBox.Service.TestServiceLocator=service_locator:create
+IceBox.Service.TestChannel=test_channel:create
+IceBox.Service.TestDriver=bridge_component_test:create
#
# The bridging service uses the test channel`s Endpoint
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 2dfcc0e..b74c1e9 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -34,7 +34,7 @@ target_link_libraries(bridge_component_test test-channel-api)
# component tests only work for integrated builds
if(integrated_build STREQUAL "true")
- icebox_add_test(bridge_component_test ../config/test_bridging.conf)
+ asterisk_scf_test_icebox(bridge_component_test config/test_bridging.conf)
endif()
asterisk_scf_component_init(bridge_unit_tests CXX)
commit 665b6a29b567c13c6bbce45fab4ca422bf2e17aa
Author: Joshua Colp <jcolp at digium.com>
Date: Thu Apr 21 13:35:25 2011 -0300
Add support for indicate.
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 2cf579f..b55ef67 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -42,105 +42,85 @@ public:
{
}
- void connected(const SessionPrx& source, const Ice::Current&)
+ void indicated(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication, const Ice::Current& current)
{
- string proxyString = source->ice_toString();
- mLogger(Debug) << FUNLOG << ": session connected " << proxyString;
-
- try
- {
- SessionWrapperPtr session = mSessions->getSession(source);
- if (!session)
- {
- mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
- << proxyString << " that does not match known sessions. Possible out of order operations.";
- return;
- }
-
- session->setup();
+ AsteriskSCF::SessionCommunications::V1::ConnectedIndicationPtr connected;
+ AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr ringing;
+ AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr stopped;
+
+ if ((connected = AsteriskSCF::SessionCommunications::V1::ConnectedIndicationPtr::dynamicCast(indication)))
+ {
+ string proxyString = source->ice_toString();
+ mLogger(Debug) << FUNLOG << ": session connected " << proxyString;
+
+ try
+ {
+ SessionWrapperPtr session = mSessions->getSession(source);
+ if (!session)
+ {
+ mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+ << proxyString << " that does not match known sessions. Possible out of order operations.";
+ return;
+ }
+
+ session->setup();
- ConnectSessionOperation connector(source, mLogger);
- mSessions->visitSessions(connector);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- throw;
- }
- }
-
- void flashed(const SessionPrx& source, const Ice::Current&)
- {
- //
- // TODO: Is there something to be done here?
- //
- }
-
- void held(const SessionPrx& source, const Ice::Current&)
- {
- //
- // TODO: Should this affect the media operations.
- //
- }
-
- void progressing(const SessionPrx& source,
- const ResponseCodePtr& response, const Ice::Current&)
- {
- //
- // TODO: Is there something to be done here?
- //
- }
-
- void ringing(const SessionPrx& source, const Ice::Current&)
- {
- //
- // TODO: Who gets the ring notifications will likely depend on configuration, etc.
- //
- RingSessionOperation ringer(source, mLogger);
- mSessions->visitSessions(ringer);
- }
-
- void stopped(const SessionPrx& source, const ResponseCodePtr& response, const Ice::Current& current)
- {
- string proxyString = source->ice_toString();
- mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
-
- //
- // IMPLNOTE: The AMI approach.
- // Stack some operations in a callback. In this case it will be:
- // 1. getSession()
- // 2. removeBridge()
- // 3. disconnect() or destroy()
- //
- // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
- // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
- // ways to go about it. Need to think on this more.
- //
- // Q. What is the proper order of operations here, should removeBridge be called before or after we
- // "disconnect" the session?
- //
- SessionWrapperPtr session;
- session = mSessions->getSession(source);
- if (!session)
- {
- mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
- << proxyString << " that does not match known sessions. Possible out of order operations.";
- return;
- }
-
- //
- // We don't actually have the proxy for this object, but we can create one on the fly.
- //
- SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
-
- //
- // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
- //
- session->shutdown(listenerPrx, new ResponseCode);
- }
-
- void unheld(const SessionPrx& source, const Ice::Current&)
- {
+ ConnectSessionOperation connector(source, mLogger);
+ mSessions->visitSessions(connector);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ": " << ex.what();
+ throw;
+ }
+ }
+ else if ((ringing = AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr::dynamicCast(indication)))
+ {
+ //
+ // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+ //
+ RingSessionOperation ringer(source, mLogger);
+ mSessions->visitSessions(ringer);
+ }
+ else if ((stopped = AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr::dynamicCast(indication)))
+ {
+ string proxyString = source->ice_toString();
+ mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
+
+ //
+ // IMPLNOTE: The AMI approach.
+ // Stack some operations in a callback. In this case it will be:
+ // 1. getSession()
+ // 2. removeBridge()
+ // 3. disconnect() or destroy()
+ //
+ // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
+ // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
+ // ways to go about it. Need to think on this more.
+ //
+ // Q. What is the proper order of operations here, should removeBridge be called before or after we
+ // "disconnect" the session?
+ //
+ SessionWrapperPtr session;
+ session = mSessions->getSession(source);
+ if (!session)
+ {
+ mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+ << proxyString << " that does not match known sessions. Possible out of order operations.";
+ return;
+ }
+
+ //
+ // We don't actually have the proxy for this object, but we can create one on the fly.
+ //
+ SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
+
+ //
+ // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
+ //
+ session->shutdown(listenerPrx, new ResponseCode);
+ }
}
private:
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 0e982aa..56a5c5a 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -70,7 +70,7 @@ void RingSessionOperation::operator()(const SessionWrapperPtr& session)
//
// TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
//
- s->ring();
+ s->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
}
catch (const Ice::ObjectNotExistException& ex)
{
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index a13dd18..a4da471 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -343,10 +343,11 @@ void SessionWrapper::connect()
// care of dealing with the AMI callbacks. More accurately, there is a class wide singleton for the callback
// object and the SessionWrapper itself is passed as the cookie. No further "book-keeping" is required.
//
- mSession->session->begin_connect(
- newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
- &ConnectRequestCallback::connectCB,
- &ConnectRequestCallback::failureCB));
+ mSession->session->begin_indicate(
+ new AsteriskSCF::SessionCommunications::V1::ConnectIndication(),
+ newCallback_Session_indicate(new ConnectRequestCallback(this, mLogger),
+ &ConnectRequestCallback::connectCB,
+ &ConnectRequestCallback::failureCB));
}
}
@@ -359,7 +360,7 @@ void SessionWrapper::ring()
return;
}
}
- tryOneWay(mSession->session)->ring();
+ tryOneWay(mSession->session)->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
}
bool SessionWrapper::setConnected()
commit 99fbea34400b3eedcbee29014104cf60b332555a
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 19 10:56:02 2011 -0230
Address shadow warnings in recent commits.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 890d0b3..986aa73 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -106,7 +106,7 @@ public:
void activate();
string id();
- SessionCollectionPtr sessions();
+ SessionCollectionPtr getSessions();
void forceUpdate();
@@ -400,7 +400,7 @@ public:
protected:
bool executeImpl()
{
- if (mBridge->sessions()->size() < 2 && mPrx)
+ if (mBridge->getSessions()->size() < 2 && mPrx)
{
mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
&CheckShutdown::failed));
@@ -908,7 +908,7 @@ string BridgeImpl::id()
return mState->bridgeId;
}
-SessionCollectionPtr BridgeImpl::sessions()
+SessionCollectionPtr BridgeImpl::getSessions()
{
return mSessions;
}
diff --git a/src/BridgeImpl.h b/src/BridgeImpl.h
index 118686a..f764355 100644
--- a/src/BridgeImpl.h
+++ b/src/BridgeImpl.h
@@ -105,7 +105,7 @@ public:
virtual std::string id() = 0;
- virtual SessionCollectionPtr sessions() = 0;
+ virtual SessionCollectionPtr getSessions() = 0;
/**
*
diff --git a/src/BridgeManagerImpl.cpp b/src/BridgeManagerImpl.cpp
index 4ce7a1b..be58b67 100644
--- a/src/BridgeManagerImpl.cpp
+++ b/src/BridgeManagerImpl.cpp
@@ -79,7 +79,7 @@ public:
void activate();
- string id()
+ string getID()
{
return mName;
}
diff --git a/src/BridgeManagerImpl.h b/src/BridgeManagerImpl.h
index f9f2c71..dd7fad3 100644
--- a/src/BridgeManagerImpl.h
+++ b/src/BridgeManagerImpl.h
@@ -50,7 +50,7 @@ public:
virtual void activate() = 0;
- virtual std::string id() = 0;
+ virtual std::string getID() = 0;
virtual void createBridgeReplica(const AsteriskSCF::Bridge::V1::BridgeStateItemPtr& bridgeState) = 0;
diff --git a/src/BridgeReplicatorStateListenerI.cpp b/src/BridgeReplicatorStateListenerI.cpp
index 0f47d91..5186463 100644
--- a/src/BridgeReplicatorStateListenerI.cpp
+++ b/src/BridgeReplicatorStateListenerI.cpp
@@ -62,7 +62,7 @@ public:
{
if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
{
- SessionCollectionPtr sessions = (*b)->sessions();
+ SessionCollectionPtr sessions = (*b)->getSessions();
sessions->removeSession(bridgedSessionItem);
//
// Keep the session list clean.
@@ -147,7 +147,7 @@ public:
BridgeManagerStateItemPtr managerItem = BridgeManagerStateItemPtr::dynamicCast((*i));
if (managerItem)
{
- if(managerItem->key == mManager->id())
+ if(managerItem->key == mManager->getID())
{
//
// There is only on bridge per listener instance by design/implementation, so this
@@ -198,7 +198,7 @@ public:
{
if ((*b) && (*b)->id() == bridgedSessionItem->bridgeId)
{
- SessionCollectionPtr sessions = (*b)->sessions();
+ SessionCollectionPtr sessions = (*b)->getSessions();
sessions->replicaUpdate(bridgedSessionItem);
//
// Keep the session list clean.
@@ -226,7 +226,7 @@ public:
{
if ((*b) && (*b)->id() == sessionPairing->bridgeKey)
{
- SessionWrapperPtr session = (*b)->sessions()->getSession(sessionPairing->sessionKey);
+ SessionWrapperPtr session = (*b)->getSessions()->getSession(sessionPairing->sessionKey);
if (session)
{
session->updateMedia(sessionPairing);
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index b0dd7f1..233df9e 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -190,48 +190,48 @@ public:
const IncomingPairings& incoming)
{
mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
- SessionPairingPtr update;
+ SessionPairingPtr currentState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
mPeer = peer;
mOutgoing = outgoing;
mIncoming = incoming;
- update = createUpdate();
+ currentState = createUpdate();
}
- pushUpdate(update);
+ pushUpdate(currentState);
}
SessionPairingPtr createUpdate()
{
if (mReplicator)
{
- SessionPairingPtr update(new SessionPairing);
- update->mediaSession = mMedia;
- update->bridgeKey = mBridgeId;
- update->sessionKey = mSessionId;
- update->key = mKey;
- update->serial = ++mRepCounter;
+ SessionPairingPtr currentState(new SessionPairing);
+ currentState->mediaSession = mMedia;
+ currentState->bridgeKey = mBridgeId;
+ currentState->sessionKey = mSessionId;
+ currentState->key = mKey;
+ currentState->serial = ++mRepCounter;
for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
{
- update->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
+ currentState->outgoingMediaPairings.push_back(new MediaPairing(i->second, i->first));
}
for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
{
- update->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
+ currentState->incomingMediaPairings.push_back(new MediaPairing(i->first, i->second));
}
- return update;
+ return currentState;
}
return 0;
}
- void pushUpdate(const SessionPairingPtr& update)
+ void pushUpdate(const SessionPairingPtr& newState)
{
- if (mReplicator && update)
+ if (mReplicator && newState)
{
try
{
ReplicatedStateItemSeq seq;
- seq.push_back(update);
+ seq.push_back(newState);
mReplicator->setState(seq);
}
catch (const std::exception& ex)
@@ -253,7 +253,7 @@ public:
{
vector<OutgoingPairing> outgoing;
vector<IncomingPairing> incoming;
- SessionPairingPtr update;
+ SessionPairingPtr newState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
outgoing = mOutgoing;
@@ -261,9 +261,9 @@ public:
incoming = mIncoming;
mIncoming.clear();
mConnected = false;
- update = createUpdate();
+ newState = createUpdate();
}
- pushUpdate(update);
+ pushUpdate(newState);
if (outgoing.size() == 0 && incoming.size() == 0)
{
return;
@@ -277,7 +277,8 @@ public:
//
for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
{
- mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and " << i->first->ice_toString();
+ mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and "
+ << i->first->ice_toString();
try
{
@@ -318,17 +319,17 @@ public:
}
}
- void update(const SessionPairingPtr& update)
+ void update(const SessionPairingPtr& newState)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mRepCounter = update->serial;
+ mRepCounter = newState->serial;
mOutgoing.clear();
mIncoming.clear();
- for (MediaPairingSeq::iterator i = update->outgoingMediaPairings.begin(); i != update->outgoingMediaPairings.end(); ++i)
+ for (MediaPairingSeq::iterator i = newState->outgoingMediaPairings.begin(); i != newState->outgoingMediaPairings.end(); ++i)
{
mOutgoing.push_back(OutgoingPairing((*i)->sink, (*i)->source));
}
- for (MediaPairingSeq::iterator i = update->incomingMediaPairings.begin(); i != update->incomingMediaPairings.end(); ++i)
+ for (MediaPairingSeq::iterator i = newState->incomingMediaPairings.begin(); i != newState->incomingMediaPairings.end(); ++i)
{
mIncoming.push_back(IncomingPairing((*i)->source, (*i)->sink));
}
@@ -339,7 +340,7 @@ public:
mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
vector<OutgoingPairing> outgoing;
vector<IncomingPairing> incoming;
- SessionPairingPtr update;
+ SessionPairingPtr newState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
outgoing = mOutgoing;
@@ -347,9 +348,9 @@ public:
incoming = mIncoming;
mIncoming.clear();
mConnected = false;
- update = createUpdate();
+ newState = createUpdate();
}
- pushUpdate(update);
+ pushUpdate(newState);
if (outgoing.size() == 0 && incoming.size() == 0)
{
return false;
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 336ab16..a13dd18 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -64,29 +64,29 @@ public:
{
ex.ice_throw();
}
- catch (const Ice::RequestFailedException& ex)
+ catch (const Ice::RequestFailedException& x)
{
//
// These exceptions indicate something is wrong with the request on this object. We should not expect that
// it could ever exceed. ObjectNotExistException is included in this catch.
// TODO- it might be reasonable to disconnect this session for this!
//
- mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
mSession->destroy();
}
- catch (const Ice::SocketException& ex)
+ catch (const Ice::SocketException& x)
{
//
// A possible retry scenario.
//
- mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
}
- catch (const std::exception& ex)
+ catch (const std::exception& x)
{
//
// Not much to be done here except log the error and move on.
//
- mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << x.what();
}
}
@@ -365,17 +365,17 @@ void SessionWrapper::ring()
bool SessionWrapper::setConnected()
{
mLogger(Debug) << FUNLOG << " for " << mId;
- BridgedSessionPtr update;
+ BridgedSessionPtr stateUpdate;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState == Added)
{
mSession->currentState = Connected;
- update = createUpdate();
+ stateUpdate = createUpdate();
}
}
- pushUpdate(update);
- return (update != 0);
+ pushUpdate(stateUpdate);
+ return (stateUpdate != 0);
}
BridgedSessionPtr SessionWrapper::getBridgedSession() const
@@ -393,17 +393,17 @@ SessionPrx SessionWrapper::getSession() const
void SessionWrapper::setupMedia()
{
mLogger(Debug) << FUNLOG << " for " << mId;
- BridgedSessionPtr update;
+ BridgedSessionPtr stateUpdate;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState == Added)
{
mSplicer->connect(this);
mSession->currentState = Connected;
- update = createUpdate();
+ stateUpdate = createUpdate();
}
}
- pushUpdate(update);
+ pushUpdate(stateUpdate);
}
void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
@@ -432,7 +432,7 @@ void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
void SessionWrapper::disconnect()
{
mLogger(Debug) << FUNLOG << ": disconnecting " << mId;
- BridgedSessionPtr update;
+ BridgedSessionPtr stateUpdate;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState != Disconnected &&
@@ -440,31 +440,31 @@ void SessionWrapper::disconnect()
{
unplugMedia();
mSession->currentState = Disconnected;
- update = createUpdate();
+ stateUpdate = createUpdate();
}
}
- pushUpdate(update);
+ pushUpdate(stateUpdate);
}
-void SessionWrapper::update(const BridgedSessionPtr& update)
+void SessionWrapper::update(const BridgedSessionPtr& newState)
{
mLogger(Debug) << FUNLOG << ": for " << mId;
boost::unique_lock<boost::shared_mutex> lock(mLock);
- if (mSession->serial < update->serial || update->serial < SerialCounterStart)
+ if (mSession->serial < newState->serial || newState->serial < SerialCounterStart)
{
- mSession = update;
+ mSession = newState;
}
else
{
- mLogger(Info) << ": detected out of order replication update for " << update->key <<
- "(" << mSession->serial << " to " << update->serial << ")";
+ mLogger(Info) << ": detected out of order replication update for " << newState->key <<
+ "(" << mSession->serial << " to " << newState->serial << ")";
}
}
void SessionWrapper::destroy()
{
mLogger(Debug) << FUNLOG << ": for " << mId;
- BridgedSessionPtr update;
+ BridgedSessionPtr newState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState != Done)
@@ -475,10 +475,10 @@ void SessionWrapper::destroy()
//
unplugMedia();
mSession->currentState = Done;
- update = createUpdate();
+ newState = createUpdate();
}
}
- pushUpdate(update);
+ pushUpdate(newState);
}
bool SessionWrapper::isDestroyed()
@@ -515,16 +515,16 @@ void SessionWrapper::shutdown(const SessionListenerPrx& listener, const Response
void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
{
mLogger(Debug) << FUNLOG << ": updating state " << mId;
- BridgedSessionPtr update;
+ BridgedSessionPtr copyOfNewState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
//
// TODO:
//
mSession->currentState = newState;
- update = createUpdate();
+ copyOfNewState = createUpdate();
}
- pushUpdate(update);
+ pushUpdate(copyOfNewState);
}
void SessionWrapper::unplugMedia()
@@ -542,12 +542,12 @@ void SessionWrapper::unplugMedia()
}
}
-void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& newState)
{
- if (update && mReplicator)
+ if (newState && mReplicator)
{
ReplicatedStateItemSeq seq;
- seq.push_back(update);
+ seq.push_back(newState);
mReplicator->setState(seq);
}
}
diff --git a/src/Tasks.h b/src/Tasks.h
index bfaa6d5..57898b3 100644
--- a/src/Tasks.h
+++ b/src/Tasks.h
@@ -163,8 +163,8 @@ protected:
{
}
- QueuedTask(const std::string& name) :
- mName(name)
+ QueuedTask(const std::string& nameStr) :
+ mName(nameStr)
{
}
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index b43506a..41ee52c 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -73,10 +73,10 @@ public:
// usage is a little more expensive in that you are always setting whether it's set or not, but it
// avoids having the "if" statement.
//
- Ice::PropertiesPtr properties = mCommunicator->getProperties();
- propGetSet(properties, "TestUtilAdapter.ThreadPool.Size", "4");
- propGetSet(properties, "TestUtilAdapter.ThreadPool.Size", "4");
- propGetSet(properties, "TestUtilAdapter.Endpoints", "default");
+ Ice::PropertiesPtr props = mCommunicator->getProperties();
+ propGetSet(props, "TestUtilAdapter.ThreadPool.Size", "4");
+ propGetSet(props, "TestUtilAdapter.ThreadPool.Size", "4");
+ propGetSet(props, "TestUtilAdapter.Endpoints", "default");
lookupProxies();
}
@@ -247,7 +247,7 @@ public:
}
}
- AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx locator()
+ AsteriskSCF::Core::Routing::V1::EndpointLocatorPrx getLocator()
{
return mLocatorPrx;
}
commit bae047c389685f743ff52036ee82429913bfe775
Author: Brent Eagles <beagles at digium.com>
Date: Tue Apr 19 09:32:21 2011 -0230
Merging async-bridging and bridge-replication branches to master.
Change summary:
- Added active->standby state replication of bridge related state.
- Added a replication service component for the above.
- Added some component test suite modifications for testing replication.
- The bridge service now uses AMI and AMD for key, potentially long running
methods.
- The bridge service implementation employs a preliminary version of
"workqueues". This will need to be converted to use the common Asterisk
SCF workqueue facility when it is completed.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 498a40a..9979a32 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -17,7 +17,7 @@ if(NOT integrated_build STREQUAL "true")
include(cmake/AsteriskSCF.cmake)
# This project is C++ based and requires a minimum of 3.4
- asterisk_scf_project("bridging service" 3.4 CXX)
+ asterisk_scf_project(BridgingService 3.4 CXX)
# Take care of slice definitions
add_subdirectory(slice)
@@ -25,9 +25,6 @@ if(NOT integrated_build STREQUAL "true")
# logger is integrated into our build
set(integrated_build true)
add_subdirectory(logger)
- set(integrated_build false)
-else()
- set(bridging_bindir ${CMAKE_CURRENT_BINARY_DIR} PARENT_SCOPE)
endif()
add_subdirectory(src)
diff --git a/README.txt b/README.txt
index 15d1447..2d1f1b6 100644
--- a/README.txt
+++ b/README.txt
@@ -1,7 +1,7 @@
===============================================================================
=== Asterisk SCF - Bridging ===
=== ===
-=== Copyright (C) 2010, Digium, Inc. ===
+=== Copyright (C) 2010-2011, Digium, Inc. ===
===============================================================================
-------------------------------------------------------------------------------
diff --git a/config/test_bridging.conf.in b/config/test_bridging.conf.in
index c9e92e0..37d7b8c 100644
--- a/config/test_bridging.conf.in
+++ b/config/test_bridging.conf.in
@@ -1,37 +1,163 @@
+################################################################################
+# Configuration file for use with the bridging component's test driver.
+#
+#
+# General configuration
+#
+# We turn off all collocation optimization by default to avoid issues with any services
+# that might have trouble due to AMI/AMD usage.
+#
+Ice.Default.CollocationOptimized=0
+
+#
+# We use a single file for configuration.
+#
+IceBox.InheritProperties=1
+
+#
+# It's important to specify a reasonable size for the client thread pool for services
+# that use AMI/AMD
+#
+Ice.ThreadPool.Client.Size=4
+
+
+################################################################################
+# The following configuration is required for the the Service Locator component
+# because it loads a collocated instance of IceStorm.
+#
+#
+# We register services with names to help identify them within an IceBox instance.
+# It also helps with identifying properties in a configuration file as each
+# property that is specific to this service will be prefixed with
+# AsteriskSCFIceStorm.
+#
AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
-AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 55555
-AsteriskSCFIceStorm.Publish.Endpoints=default -p 55556
+
+
+#
+# Configure the IceStorm TopicManager's object adapter.
+#
+AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 10000
+AsteriskSCFIceStorm.TopicManager.ThreadPool.Size=4
+
+#
+# Configure the IceStorm publisher object adapter.
+#
+AsteriskSCFIceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001
+AsteriskSCFIceStorm.Publish.ThreadPool.Size=4
+
+#
+# This IceStorm instance will not need to persist subscriber/publisher
+# information across process lifetimes.
+#
AsteriskSCFIceStorm.Transient=1
+#
+# Control TopicManager tracing.
+#
+# 0 = no tracing
+# 1 = trace topic creation, subscription, unsubscription
+# 2 = like 1, but with more detailed subscription information
+#
+AsteriskSCFIceStorm.Trace.TopicManager=0
+
+#
+# Flush interval in case any any subscribers have subscribed
+# using a batched oneway proxy. (Default is currently 1000ms)
+#
+AsteriskSCFIceStorm.Flush.Timeout=2000
+
+#
+# Finally the proxy that can be used to access this IceStorm instance.
+#
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+
+#
+# Logger service configuration. Proxies to the logger service
+# are obtained through the service locator so specifying a port
+# number is not required.
+#
LoggerAdapter.Endpoints=default
AsteriskSCF.LoggingService.Endpoints=default
+AsteriskSCF.LoggingService.ThreadPool.Size=4
AsteriskSCF.LoggingClient.Endpoints=default
-AsteriskSCF.TestChannelService.Endpoints=default -p 55560
-
-LocatorService.Proxy=LocatorService:default -p 55558
+AsteriskSCF.LoggingClient.ThreadPool.Size=4
-ServiceLocatorManagementAdapter.Endpoints=tcp -p 55557
-ServiceLocatorAdapter.Endpoints=tcp -p 55558
-ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 55557
+#
+# Service Locator configuration. Usually the service locator
+# and it`s collocated IceStorm instance are the only services
+# that need to have their ports specified. The service locator
+# instantiates two adapters:
+# * The ServiceLocatorAdapter which hosts the servants that implement
+# the lookup queries for clients wishing to obtain proxies to services.
+# * The ServiceLocatorManagerAdapter which hosts the servants for the
+# objects that implement the registration management for services
+#
+ServiceLocatorAdapter.Endpoints=tcp -p 4411
+ServiceLocatorAdapter.ThreadPool.Size=4
+ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
+ServiceLocatorManagementAdapter.ThreadPool.Size=4
-TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 55555
+#
+# The proxies that clients use to access the Service Locator facilities.
+#
+LocatorService.Proxy=LocatorService:default -p 4411
+ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
-IceBox.InheritProperties=1
+#
+# The IceBox entries for loading the services.
+#
IceBox.Service.Logger=${logger_bindir}/server/src at logging-service:createLoggingService
+IceBox.Service.Replicator=../src at BridgeReplicator:create
IceBox.Service.TestBridge=../src at bridgeservice:create
+IceBox.Service.TestBridge2=../src/@bridgeservice:create
IceBox.Service.TestServiceLocator=${service_locator_bindir}/src at service_locator:create
IceBox.Service.TestChannel=${test_channel_bindir}/src at test_channel:create
-IceBox.Service.TestDriver=../test at bridging_unit_test:create
+IceBox.Service.TestDriver=../test at bridge_component_test:create
+
+#
+# The bridging service uses the test channel`s Endpoint
+# to create test sessions and register them with the bridge.
+# NOTE: this will be changed to be accessible through
+# service discovery in a later branch.
+#
+TestChannel.InstanceName=BridgeTest
+#
+# Configuration for the test bridge instances. There are are two: one master
+# (TestBridge) and one standby (TestBridge2).
+# NOTE: These will be changed to be accessible through
+# service discovery in a later branch.
+#
TestBridge.InstanceName=TestBridge
-TestBridge.BridgeService.Endpoints=default -p 55561
-TestBridge.Proxy=BridgeManager:default -p 55561
-TestChannel.Proxy=TestChannel:default -p 55560
-TestUtilAdapter.Endpoints=default -p 55562
-Commands.Proxy=TestChannel.Locator.Commands:default -p 55560
+TestBridge.ManagerId=TestBridgeManager
+
+TestBridge2.InstanceName=TestBridge2
+TestBridge2.ManagerId=TestBridgeManager2
+TestBridge2.StateReplicatorListener=yes
+
+#
+# Configuration for the bridge state replicator.
+# NOTE: This will be changed to only use service discovery in the a later
+# branch (i.e. it will not be necessary to configure the object adapter
+# or specify the proxy in the future)
+#
+Replicator.InstanceName=Replicator
-IceBox.LoadOrder=TestServiceLocator Logger TestBridge TestChannel TestDriver
+#
+# Some IceBox configuration.
+#
+#
+# Multiple services are loaded in the single IceBox instance. It is necessary to have
+# them start in a specific order so dependencies are available when they are needed.
+#
+IceBox.LoadOrder=TestServiceLocator Logger Replicator TestBridge TestBridge2 TestChannel TestDriver
+#
+# Configuring a manager endpoint for the IceBox service allows services to be managed and
+# monitored.
+#
IceBox.ServiceManager.Endpoints=default -p 56000
+IceBox.ServiceManager.ThreadPool.Size=4
IceBoxMgr.Proxy=IceBox/ServiceManager:default -p 56000
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index b799ca0..890d0b3 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -1,7 +1,7 @@
/*
* Asterisk SCF -- An open-source communications framework.
*
- * Copyright (C) 2010, Digium, Inc.
+ * Copyright (C) 2010-2011, Digium, Inc.
*
* See http://www.asterisk.org for more information about
* the Asterisk SCF project. Please do not directly contact
@@ -14,443 +14,506 @@
* at the top of the source tree.
*/
#include "BridgeImpl.h"
+
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
#include <Ice/Ice.h>
+#include <boost/thread/locks.hpp>
#include <memory>
#include <algorithm>
-#include <boost/thread/locks.hpp>
+#include <set>
+#include "ServiceUtil.h"
+#include "DebugUtil.h"
+
+#include "SessionWrapper.h"
+#include "SessionOperations.h"
+#include "SessionListener.h"
using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SessionCommunications::V1;
+using namespace AsteriskSCF::BridgeService;
+using namespace AsteriskSCF::Bridge::V1;
+using namespace AsteriskSCF;
+using namespace std;
/**
*
- * NOTE: Code must be reviewed/refactored for exception safety/consistency.
- * Operations involving establishing media connections may or may not be
- * catastrophic. An example of a non-catastrophic situation might be a media
- * allocation operation that might immediately fail for transient reasons but
- * can be initialized in the background in a relatively timely fashion (of
- * course this would depend on the context).
+ * NOTE: Once asynchronous invocations began to be added to the mix, several shortcomings of both the initial
+ * implementation of the bridge and its replication became apparent. In the previous implementation, the bridge
+ * operations were very "serial" when it came to adding and removing sessions. When converting these operations to
+ * AMD/AMI, the serial approach completely falls apart. Especially when replication is thrown into the mix. For example,
+ * in a typical AMD/AMI scenario, the setBridge() and media allocation operations would occur as AMI requests. The
+ * apparent outcome will be that the participants will "appear" on the call in the order in which the operations
+ * complete (i.e. randomly). Indeed it is possible that some will ultimately fail and some sessions might actually be
+ * removed from the bridge before they are able to complete. Reconciling the completely asynchronous nature to an
+ * implementation is just too awful. As asynchronous support in bridging is the driving factor behind these changes, the
+ * refactoring for replication could have occurred in that development branch. However, replication is a separate task
+ * and it is desirable to have a *relevant* review of it independently of asynchronous support.
*
*/
namespace
{
-Logger lg = getLoggerFactory().getLogger("AsteriskSCF.BridgeService");
-class RetryPolicy
+/**
+ *
+ * BridgeImpl is a reference implmentation of the AsteriskSCF::Bridging::V1::Bridge
+ * interface.
+ *
+ **/
+class BridgeImpl : virtual public BridgeServant
{
public:
- RetryPolicy(size_t maxRetries, size_t intervalInMilliseconds) :
- mMaxRetries(maxRetries),
- mRetryInterval(intervalInMilliseconds),
- mCounter(0)
- {
- }
+ BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& objAdapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger);
- bool canRetry()
- {
- return mCounter < mMaxRetries;
- }
+ ~BridgeImpl();
- bool retry()
- {
- lg(Debug) << "Retrying for the " << mCounter + 1 << " time.";
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(mRetryInterval));
- ++mCounter;
- return canRetry();
- }
+ //
+ // AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
+ //
+ void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
+ void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
+ void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current&);
+
+ SessionSeq listSessions(const Ice::Current&);
+ void shutdown(const Ice::Current& current);
+ void destroy(const Ice::Current& current);
+
+ void addListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+ void removeListener(const BridgeListenerPrx& listener, const Ice::Current& current);
+
+ void replaceSession_async(const AMD_Bridge_replaceSessionPtr& callbac, const SessionPrx& sessionToReplace,
+ const SessionSeq& newSessions, const Ice::Current& current);
+
+ //
+ // BridgeServant methods
+ //
+ bool destroyed();
+ void destroyImpl();
+ void shutdownImpl(const Ice::Current& current);
+ void activate(const BridgePrx& proxy);
+
+ void updateState(const BridgeStateItemPtr& state);
+ void addListener(const BridgeListenerStateItemPtr& update);
+ void removeListener(const BridgeListenerStateItemPtr& update);
+
+ void activate();
+ string id();
+ SessionCollectionPtr sessions();
+
+ void forceUpdate();
+
+ void getAddSessionsTasks(QueuedTasks& tasks, const SessionSeq& sessions);
private:
- size_t mMaxRetries;
- size_t mRetryInterval;
- size_t mCounter;
+
+ boost::shared_mutex mLock;
+
+ //
+ // True if not a standby object.
+ //
+ bool mActivated;
+
+ //
+ // The replicated state.
+ //
+ BridgeStateItemPtr mState;
+
+ //
+ // Helper class for dealing with the sessions.
+ //
+ SessionCollectionPtr mSessions;
+
+ const string mName;
+ Ice::ObjectAdapterPtr mObjAdapter;
+
+ BridgeListenerMgrPtr mListeners;
+ ReplicatorSmartPrx mReplicator;
+
+ //
+ // The bridge's callback implementation for the sessions that get added to it.
+ //
+ SessionListenerPtr mSessionListener;
+ SessionListenerPrx mSessionListenerPrx;
+
+ //
+ // A proxy to this bridge. Used when publishing events.
+ //
+ BridgePrx mPrx;
+
+ //
+ // The logger object.
+ //
+ Logger mLogger;
+
+ void statePreCheck();
+ BridgeStateItemPtr createUpdate();
+ void pushUpdate(const BridgeStateItemPtr& update);
+ void pushUpdates(const ReplicatedStateItemSeq& updates);
+ BridgeListenerStateItemPtr createFirstListenerUpdate(const BridgeListenerPrx& listener);
+
+ bool replicate()
+ {
+ return (mActivated && mReplicator != 0);
+ }
};
+typedef IceUtil::Handle<BridgeImpl> BridgeImplPtr;
-void checkSessions(const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
+//
+// simply checks for nulls at this point.
+//
+static void checkSessions(const SessionSeq& sessions)
{
Ice::LongSeq invalidIndexes;
Ice::Long index = 0;
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i, ++index)
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i, ++index)
{
- if(*i == 0)
+ if (*i == 0)
{
invalidIndexes.push_back(index);
}
}
- if(invalidIndexes.size() > 0)
+ if (invalidIndexes.size() > 0)
{
- throw AsteriskSCF::SessionCommunications::V1::InvalidSessions(invalidIndexes);
+ throw InvalidSessions(invalidIndexes);
}
}
-}
-
-//
-// Compiled in constants.
-// TODO: Replace with configuration!
-//
-
-static const std::string TopicPrefix("AsteriskSCF.Bridge.");
-//
-// TODO:
-// Operations that are performed on all bridge sessions might be better done as AMI requests.
-//
-namespace AsteriskSCF
-{
-namespace BridgeService
-{
-//
-// Functor to support using for_each on shutdown.
-//
-class ShutdownImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SessionsTracker : public IceUtil::Shared
{
public:
- ShutdownImpl(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
- mListener(listener),
- mResponse(response)
+ SessionsTracker() :
+ mResponses(0)
{
}
-
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+ void add(const SessionPrx& s)
{
- try
- {
- b->getSession()->removeBridge(mListener);
- b->disconnect();
- b->getSession()->stop(mResponse);
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- mNonExistent.push_back(b);
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessions.push_back(s);
+ ++mResponses;
+ }
+
+ SessionSeq getSessions()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessions;
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+ void addException(const SessionPrx& session, const Ice::Exception& ex)
{
- return mNonExistent;
+ addExceptionMessage(session, ex.what());
}
-private:
- AsteriskSCF::SessionCommunications::V1::SessionListenerPrx mListener;
- AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-class ProgressingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- ProgressingImpl(const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response) :
- mResponse(response)
+ void addExceptionMessage(const SessionPrx& session, const string& msg)
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ SessionError error;
+ error.failedSession = session;
+ error.message = msg;
+ mExceptions.push_back(error);
+ ++mResponses;
}
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ SessionErrorSeq getExceptions()
{
- try
- {
- b->getSession()->progress(mResponse);
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mExceptions;
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+ size_t responseCount()
{
- return mNonExistent;
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mResponses;
}
private:
- AsteriskSCF::SessionCommunications::V1::ResponseCodePtr mResponse;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ boost::shared_mutex mLock;
+ SessionSeq mSessions;
+ SessionErrorSeq mExceptions;
+ size_t mResponses;
};
+typedef IceUtil::Handle<SessionsTracker> SessionsTrackerPtr;
-class RingImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class RemoveSessionsNotify : public QueuedTask
{
public:
- RingImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
- mExclude(exclude)
+ RemoveSessionsNotify(const BridgeListenerMgrPtr& bridgeListeners,
+ const SessionsTrackerPtr& tracker) :
+ QueuedTask("RemoveSessionsNotify"),
+ mBridgeListeners(bridgeListeners),
+ mTracker(tracker)
{
}
-
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+protected:
+ bool executeImpl()
{
- if(b->getSession() != mExclude)
+ SessionSeq sessions = mTracker->getSessions();
+ if (!sessions.empty())
{
- try
- {
- b->ring();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
+ mBridgeListeners->sessionsRemoved(sessions);
}
+ return true;
}
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ BridgeListenerMgrPtr mBridgeListeners;
+ SessionsTrackerPtr mTracker;
};
-class FlashImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
+class SetBridgeTask : public QueuedTask
{
public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ SetBridgeTask(const SessionCollectionPtr& sessionCollection, const BridgePrx& bridge,
+ const SessionListenerPrx& listener, const SessionSeq& sessions, const SessionsTrackerPtr& tracker):
+ QueuedTask("SetBridgeTask"),
+ mSessionManager(sessionCollection),
+ mBridge(bridge),
+ mSessionListener(listener),
+ mSessions(sessions),
+ mTracker(tracker)
{
- try
- {
- b->getSession()->flash();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
+protected:
+ bool executeImpl()
{
- return mNonExistent;
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->addSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session already added");
+ continue;
+ }
+ tasksDispatched = true;
+ (*i)->begin_setBridge(mBridge, mSessionListener,
+ newCallback_Session_setBridge(this, &SetBridgeTask::set,
+ &SetBridgeTask::failed), session);
+ }
+ return !tasksDispatched;
}
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-class HoldImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ void set(const SessionInfoPtr& info, const SessionWrapperPtr& session)
{
- try
+ mTracker->add(session->getSession());
+ if (info->currentState != "ready")
{
- b->getSession()->hold();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ //
+ // setupMedia is an AMI backed implementation, so should not block here.
+ //
+ session->setupMedia();
}
- catch(const Ice::Exception& ex)
+ if (mTracker->responseCount() == mSessions.size())
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->succeeded();
}
}
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class UnholdImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- void operator()(const BridgeImpl::BridgeSessionPtr& b)
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
{
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
try
{
- b->getSession()->unhold();
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
}
- catch(const Ice::ObjectNotExistException& ex)
+ catch(...)
{
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
}
- catch(const Ice::Exception& ex)
+ if (mTracker->responseCount() == mSessions.size())
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- }
- }
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-private:
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
-};
-
-class ConnectImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, void>
-{
-public:
- ConnectImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude) :
- mExclude(exclude)
- {
- }
-
- void operator()(BridgeImpl::BridgeSessionPtr& b)
- {
- if(b->getSession() != mExclude)
- {
- try
- {
- b->connect();
- }
- catch(const Ice::ObjectNotExistException& ex)
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
{
- mNonExistent.push_back(b);
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->failed();
}
- catch(const Ice::Exception& ex)
+ else
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
+ mListener->succeeded();
}
}
}
-
- const std::vector<BridgeImpl::BridgeSessionPtr>& nonExistentObjects()
- {
- return mNonExistent;
- }
-
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mExclude;
- std::vector<BridgeImpl::BridgeSessionPtr> mNonExistent;
+ SessionCollectionPtr mSessionManager;
+ BridgePrx mBridge;
+ SessionListenerPrx mSessionListener;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
};
-class FindImpl : public std::unary_function<BridgeImpl::BridgeSessionPtr, bool>
+class AddToListeners : public QueuedTask
{
public:
- FindImpl(const AsteriskSCF::SessionCommunications::V1::SessionPrx& prx) :
- mPrx(prx)
+ AddToListeners(const BridgeListenerMgrPtr& listeners, const SessionsTrackerPtr& tracker) :
+ QueuedTask("AddToListeners"),
+ mListeners(listeners),
+ mTracker(tracker)
{
}
-
- bool operator()(const BridgeImpl::BridgeSessionPtr& b)
+
+protected:
+ bool executeImpl()
{
- return b->getSession() == mPrx;
+ mListeners->sessionsAdded(mTracker->getSessions());
+ return true;
}
+
private:
- AsteriskSCF::SessionCommunications::V1::SessionPrx mPrx;
+ BridgeListenerMgrPtr mListeners;
+ SessionsTrackerPtr mTracker;
};
-
-//
-// For events that require modification to the bridge, we use helper methods on the bridge itself.
-// For events result in distribution to the bridge sessions, we copy the current sessions and
-// run the calls from the listener itself.
-//
-class SessionListener : public AsteriskSCF::SessionCommunications::V1::SessionListener
+class CheckShutdown : public QueuedTask
{
public:
- SessionListener(const BridgeImplPtr& b) :
- mBridge(b)
+ CheckShutdown(const BridgeImplPtr& bridge, const BridgePrx& proxy) :
+ QueuedTask("CheckShutdown"),
+ mBridge(bridge),
+ mPrx(proxy)
{
}
- void connected(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+protected:
+ bool executeImpl()
{
- try
- {
- mBridge->sessionConnected(source);
- }
- catch(const Ice::Exception& ex)
+ if (mBridge->sessions()->size() < 2 && mPrx)
{
- lg(Debug) << __FUNCTION__ << ":" << __LINE__ << ex.what();
- throw;
+ mPrx->begin_shutdown(newCallback_Bridge_shutdown(this, &CheckShutdown::done,
+ &CheckShutdown::failed));
}
- std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
- std::for_each(sessions.begin(), sessions.end(), ConnectImpl(source));
+ //
+ // We don't care about the result really. The CheckShutdown instance will hang
+ // around because of the AMI request so the completion of the request will not
+ // have an issue.
+ //
+ return true;
}
- void flashed(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+ void done()
{
+ //
+ // We don't care about the ending.
+ //
}
- void held(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+ void failed(const Ice::Exception&)
{
+ //
+ // Operationally, we don't really care but we should probably log
+ //
}
+
+private:
+ BridgeImplPtr mBridge;
+ BridgePrx mPrx;
+};
- void progressing(const AsteriskSCF::SessionCommunications::V1::SessionPrx&,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr&, const Ice::Current&)
+template <class T>
+class GenericAMDCallback : public QueuedTask
+{
+public:
+ GenericAMDCallback(const T& cb, const SessionsTrackerPtr& tracker) :
+ QueuedTask("GenericAMDCallback"),
+ mCallback(cb),
+ mTracker(tracker)
{
}
+protected:
- void ringing(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source, const Ice::Current&)
+ bool executeImpl()
{
- std::vector<BridgeImpl::BridgeSessionPtr> sessions(mBridge->currentSessions());
- if(sessions.size() > 0)
- {
- std::for_each(sessions.begin(), sessions.end(), RingImpl(source));
- }
+ mCallback->ice_response();
+ return true;
}
- void stopped(const AsteriskSCF::SessionCommunications::V1::SessionPrx& source,
- const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& response, const Ice::Current&)
+ void failImpl()
{
- size_t endpointCount = mBridge->sessionStopped(source, response);
- if(endpointCount < 2)
+ SessionErrorSeq errors(mTracker->getExceptions());
+ if (!errors.empty())
{
- mBridge->spawnShutdown();
+ mCallback->ice_exception(BridgeSessionOperationFailed(errors));
+ }
+ else
+ {
+ mCallback->ice_exception();
}
}
- void unheld(const AsteriskSCF::SessionCommunications::V1::SessionPrx&, const Ice::Current&)
+private:
+ T mCallback;
+ SessionsTrackerPtr mTracker;
+};
+
+class UpdateTask : public QueuedTask
+{
+public:
+ UpdateTask(const BridgeImplPtr& bridge) :
+ QueuedTask("UpdateTask"),
+ mBridge(bridge)
{
}
+protected:
+ bool executeImpl()
+ {
+ mBridge->forceUpdate();
+ return true;
+ }
private:
BridgeImplPtr mBridge;
};
-} // End of namespace BridgeService
-} // End of namespace AsteriskSCF
-
-AsteriskSCF::BridgeService::BridgeImpl::BridgeImpl(const Ice::ObjectAdapterPtr& adapter,
- const std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>& listeners,
- const AsteriskSCF::BridgeService::BridgeListenerMgrPtr& listenerMgr,
- const AsteriskSCF::SessionCommunications::V1::BridgePrx& prx) :
- mState(Running),
+} // End of anonymous namespace
+
+BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
+ const vector<BridgeListenerPrx>& listeners,
+ const BridgeListenerMgrPtr& listenerMgr,
+ const ReplicatorSmartPrx& replicator,
+ const BridgeStateItemPtr& state,
+ const Logger& logger) :
+ mActivated(false),
+ mState(state),
+ mSessions(new SessionCollection(adapter->getCommunicator(), name, replicator, logger)),
+ mName(name),
mObjAdapter(adapter),
mListeners(listenerMgr),
- mSessionListener(new SessionListener(this)),
- mPrx(prx)
+ mReplicator(replicator),
+ mSessionListener(createSessionListener(mSessions, logger)),
+ mLogger(logger)
{
- for(std::vector<AsteriskSCF::SessionCommunications::V1::BridgeListenerPrx>::const_iterator i = listeners.begin();
+ mLogger(Debug) << FUNLOG << ": creating a new Bridge with " << listeners.size() << " default listeners";
+ for (vector<BridgeListenerPrx>::const_iterator i = listeners.begin();
i != listeners.end(); ++i)
{
mListeners->addListener(*i);
}
- std::string listenerId = mObjAdapter->getCommunicator()->identityToString(prx->ice_getIdentity());
- listenerId += ".sessionListener";
- mSessionListenerPrx =
- AsteriskSCF::SessionCommunications::V1::SessionListenerPrx::uncheckedCast(
- mObjAdapter->add(mSessionListener, mObjAdapter->getCommunicator()->stringToIdentity(listenerId))
- );
}
-AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
+BridgeImpl::~BridgeImpl()
{
//
// TODO: Determine if we need to clean up the listener manager. We may not
@@ -459,525 +522,507 @@ AsteriskSCF::BridgeService::BridgeImpl::~BridgeImpl()
//
}
-void AsteriskSCF::BridgeService::BridgeImpl::addSessions(
- const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current& current)
+void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current& current)
{
- if(sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- AsteriskSCF::SessionCommunications::V1::SessionSeq addedSessions;
+ try
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- statePreCheck();
- lg(Debug) << __FUNCTION__ << ": adding " << sessions.size() << " sessions to "
- << current.adapter->getCommunicator()->identityToString(current.id);
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+ if (sessions.empty())
{
- //
- // TODO: how do we want to handle sessions that have already been added to the bridge. Its pretty much
- // impossible to guard against race conditions where multiple call paths might want to add a session
- // more than once for some reason. We should probably just log it and move on.
- //
- std::vector<BridgeSessionPtr>::iterator j =
- find_if(mSessions.begin(), mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
- if(j != mSessions.end())
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " is already registered with this bridge.";
- continue;
- }
-
- AsteriskSCF::SessionCommunications::V1::SessionInfoPtr info;
- try
+ if (callback)
{
- RetryPolicy policy(5, 500);
- //
- // canRetry should never return false since we throw ourselves out of this loop. But
- // we'll do it here in case we decide to do something else.
- //
- while(policy.canRetry())
- {
- try
- {
- info = (*i)->setBridge(mPrx, mSessionListenerPrx);
- break;
- }
- catch(const Ice::ConnectionLostException&)
- {
- if(!policy.retry())
- {
- throw;
- }
- }
- }
+ callback->ice_response();
}
- catch(const Ice::Exception& ex)
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString() << " threw " << ex.what()
- << " continuing";
- }
- //
- // We need to define these states! Especially the ones that define when start is called or not.
- //
- if(info->currentState == "ready")
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " current state is ready (not yet connected), not establishing media connections.";
- mSessions.push_back(new BridgeSession(*i, 0, false));
- }
- else
- {
- lg(Debug) << __FUNCTION__ << ": " << (*i)->ice_toString()
- << " media is expected to be establishing, plugging media into bridge.";
- mSessions.push_back(new BridgeSession(*i, mSplicer.connect(*i), false));;
- }
-
- addedSessions.push_back(*i);
+ return;
}
+ checkSessions(sessions);
+ statePreCheck();
+ mLogger(Debug) << FUNLOG << ": adding " << sessions.size() << " sessions";
+
+ SessionsTrackerPtr tracker(new SessionsTracker);
+ QueuedTasks tasks;
+ tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
+ tasks.push_back(new AddToListeners(mListeners, tracker));
+ tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+ tasks.push_back(new UpdateTask(this));
+ ExecutorPtr executor(new Executor(tasks, mLogger));
+ executor->start();
+ //
+ // When the operations have all completed, that last task will take care of handling
+ // the callback. It's all left withing the try/catch in the event something happens during
+ // task startup.
+ //
}
- if(addedSessions.size())
+ catch (const std::exception& ex)
{
- mListeners->sessionsAdded(addedSessions);
+ callback->ice_exception(ex);
+ }
+ catch (...)
+ {
+ callback->ice_exception();
}
}
-void AsteriskSCF::BridgeService::BridgeImpl::removeSessions(
- const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions, const Ice::Current&)
+void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
+ const Ice::Current&)
{
- if(sessions.size() == 0)
- {
- return;
- }
- checkSessions(sessions);
- AsteriskSCF::SessionCommunications::V1::SessionSeq removedSessions;
+ try
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (sessions.empty())
+ {
+ callback->ice_response();
+ return;
+ }
+ checkSessions(sessions);
statePreCheck();
- for(AsteriskSCF::SessionCommunications::V1::SessionSeq::const_iterator i = sessions.begin();
- i != sessions.end(); ++i)
+
+ //
+ // The shutdown of individual sessions are implemented as series of AMI requests. Once initiated,
+ // we allow them to proceed asynchronously and do not concern ourselves with the result.
+ // The logic of shutdown should remove them either because the operations succeeded or
+ // *couldn't* be accomplished because of some terminal condition. At any rate, waiting around for them
+ // is pointless.
+ //
+ SessionsTrackerPtr removed(new SessionsTracker);
+ for (SessionSeq::const_iterator i = sessions.begin(); i != sessions.end(); ++i)
{
- std::vector<BridgeSessionPtr>::iterator j = std::find_if(mSessions.begin(),
- mSessions.end(), AsteriskSCF::BridgeService::FindImpl(*i));
- if(j != mSessions.end())
+ SessionWrapperPtr session = mSessions->getSession(*i);
+ if (session)
{
- try
- {
- (*j)->getSession()->removeBridge(mSessionListenerPrx);
- }
- catch(const Ice::Exception& ex)
- {
- lg(Info) << __FUNCTION__ << ": removingthe bridge from " << (*j)->getSession() << " threw "
- << ex.what();
- }
- (*j)->disconnect();
- mSessions.erase(j);
... 8618 lines suppressed ...
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list