[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Sun Oct 2 17:54:37 CDT 2011
branch "master" has been updated
via 1e99ad99de150e86169a828a6341717dad05e110 (commit)
from 7d67896288e9087ba634e48430f0417f3e3e1526 (commit)
Summary of changes:
src/BridgeImpl.cpp | 178 +++++++++++++++++++++++++++++++++++++++++++++
src/SessionOperations.cpp | 103 ++++++++++++++++++++++++++
src/SessionOperations.h | 38 ++++++++++
test/TestBridging.cpp | 101 +++++++++++++++++++++++++
4 files changed, 420 insertions(+), 0 deletions(-)
- Log -----------------------------------------------------------------
commit 1e99ad99de150e86169a828a6341717dad05e110
Author: Ken Hunt <ken.hunt at digium.com>
Date: Sun Oct 2 17:53:26 2011 -0500
Added connect/disconnect of telephony event sources and sinks.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 36ea2dd..0c94a05 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -973,6 +973,175 @@ private:
BridgeImplPtr mBridge;
};
+class ConnectTelephonyEventsTask: public QueuedTask
+{
+public:
+ ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+ const SessionSeq& newSessions,
+ const Logger& logger)
+ : QueuedTask("ConnectTelephonyEventsTask"),
+ mBridge(bridge),
+ mNewSessions(newSessions),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ vector<TelephonySessionPrx> ignoreList;
+
+ // Assumption: All the sessions we need to connect to are already
+ // part of the bridge's session collection.
+ for (SessionSeq::iterator i = mNewSessions.begin(); i != mNewSessions.end(); ++i)
+ {
+ TelephonySessionPrx telephonySession;
+ try
+ {
+ telephonySession = TelephonySessionPrx::checkedCast(*i);
+ }
+ catch(...)
+ {
+ continue;
+ }
+
+ if (telephonySession == 0)
+ {
+ continue;
+ }
+
+ ConnectTelephonyOperation op(telephonySession, ignoreList, mLogger);
+ mBridge->getSessions()->visitSessions(op);
+
+ // Since this session is now connected to all others in the bridge (including
+ // those in the mNewSessions set) we don't need to connect this one again.
+ ignoreList.push_back(telephonySession);
+ }
+ return true;
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionSeq mNewSessions;
+ Logger mLogger;
+};
+
+class DisconnectTelephonyEventsTask: public QueuedTask
+{
+public:
+ DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+ const SessionSeq& disconnectingSessions,
+ const Logger& logger)
+ : QueuedTask("DisconnectTelephonyEventsTask"),
+ mBridge(bridge),
+ mDisconnectingSessions(disconnectingSessions),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ for (SessionSeq::iterator i = mDisconnectingSessions.begin(); i != mDisconnectingSessions.end(); ++i)
+ {
+ TelephonySessionPrx telephonySession;
+ try
+ {
+ telephonySession = TelephonySessionPrx::checkedCast(*i);
+ }
+ catch(...)
+ {
+ continue;
+ }
+
+ if (telephonySession == 0)
+ {
+ continue;
+ }
+
+ DisconnectTelephonyOperation op(telephonySession, mLogger);
+ mBridge->getSessions()->visitSessions(op);
+ }
+
+ /**
+ * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
+ * and just use a visitor. But when a session is removed, AMI operations are involved. We use an
+ * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe.
+ */
+ disconnectMembers();
+
+ return true;
+ }
+
+ /**
+ * Disconnect the members of mDisconnectingSessions from each other.
+ */
+ void disconnectMembers()
+ {
+ for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+ i != mDisconnectingSessions.end(); ++i)
+ {
+ TelephonySessionPrx session1 = TelephonySessionPrx::checkedCast(*i);
+ if (session1 == 0)
+ {
+ // If not a telephony session, nothing to do.
+ continue;
+ }
+
+ disconnectMembersFrom(session1);
+ }
+ }
+
+ void disconnectMembersFrom(TelephonySessionPrx session1)
+ {
+ for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+ i != mDisconnectingSessions.end(); ++i)
+ {
+ if (session1->ice_getIdentity() == (*i)->ice_getIdentity())
+ {
+ // It's not connected to itself.
+ continue;
+ }
+
+ TelephonySessionPrx session2 = TelephonySessionPrx::checkedCast(*i);
+ if (session2== 0)
+ {
+ // If not a telephony session, nothing to do.
+ continue;
+ }
+
+ disconnectSinks(session1, session2);
+ disconnectSinks(session2, session1);
+ }
+ }
+
+ void disconnectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+ {
+ if (sourceSession->ice_getIdentity() == sinkSession->ice_getIdentity())
+ {
+ // Not connected to ourselves.
+ return;
+ }
+
+ TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+ TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=fromSources.begin();
+ i != fromSources.end(); ++i)
+ {
+ (*i)->removeSinks(sinksToRemove);
+ }
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionSeq mDisconnectingSessions;
+ Logger mLogger;
+};
+
+
} // End of anonymous namespace
BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
@@ -1082,6 +1251,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -1141,6 +1311,7 @@ void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callba
}
tasks.push_back(new RemoveSessionsNotify(mListeners, removed, cookies));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
+ tasks.push_back(new DisconnectTelephonyEventsTask(this, sessions, mLogger));
tasks.push_back(new CheckShutdown(this, mPrx));
ExecutorPtr runner(new Executor(tasks, mLogger));
runner->start();
@@ -1342,6 +1513,10 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
boost::shared_lock<boost::shared_mutex> lock(mLock);
cookies = getCookies();
}
+
+ SessionSeq removedSessions;
+ removedSessions.push_back(sessionToReplace);
+
tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
tasks.push_back(new UnplugMedia(this));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
@@ -1349,6 +1524,8 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, newSessions, mLogger));
+ tasks.push_back(new DisconnectTelephonyEventsTask(this, removedSessions, mLogger));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -1575,6 +1752,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
tasks.push_back(new UpdateTask(this));
}
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index e6bf5f8..38c0184 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -233,3 +233,106 @@ void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
{
}
+
+ConnectTelephonyOperation::ConnectTelephonyOperation(const TelephonySessionPrx& sessionToConnect,
+ const vector<TelephonySessionPrx>& ignoreList,
+ const Logger& logger)
+ : mSessionToConnect(sessionToConnect),
+ mIgnoreList(ignoreList),
+ mLogger(logger)
+{
+}
+
+void ConnectTelephonyOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (session->getSession()->ice_getIdentity() == mSessionToConnect->ice_getIdentity())
+ {
+ // Let's not connect a session to itself. .
+ return;
+ }
+
+ TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(session->getSession());
+ if (visitedTelephonySession == 0)
+ {
+ // If the session being visited isn't telephony, nothing to do.
+ return;
+ }
+
+ if (inIgnoreList(visitedTelephonySession))
+ {
+ return;
+ }
+
+ connectSinks(mSessionToConnect, visitedTelephonySession);
+ connectSinks(visitedTelephonySession, mSessionToConnect);
+
+}
+
+bool ConnectTelephonyOperation::inIgnoreList(const TelephonySessionPrx& session)
+{
+ for(vector<TelephonySessionPrx>::iterator i=mIgnoreList.begin();
+ i != mIgnoreList.end(); ++i)
+ {
+ if (session->ice_getIdentity() == (*i)->ice_getIdentity())
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+void ConnectTelephonyOperation::connectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+{
+ TelephonyEventSinkSeq sinksToAdd = sinkSession->getSinks();
+ TelephonyEventSourceSeq toSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=toSources.begin();
+ i != toSources.end(); ++i)
+ {
+ (*i)->addSinks(sinksToAdd);
+ }
+}
+
+DisconnectTelephonyOperation::DisconnectTelephonyOperation(const TelephonySessionPrx& sessionToDisconnect,
+ const Logger& logger)
+ : mSessionToDisconnect(sessionToDisconnect),
+ mLogger(logger)
+{
+}
+
+void DisconnectTelephonyOperation::operator()(const SessionWrapperPtr& visitedSession)
+{
+ if (visitedSession->getSession()->ice_getIdentity() == mSessionToDisconnect->ice_getIdentity())
+ {
+ // Not connected to ourselves.
+ return;
+ }
+
+ TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(visitedSession->getSession());
+ if (visitedTelephonySession == 0)
+ {
+ // If the session being visited isn't telephony, nothing to do.
+ return;
+ }
+
+ disconnectSinks(mSessionToDisconnect, visitedTelephonySession);
+ disconnectSinks(visitedTelephonySession, mSessionToDisconnect);
+
+}
+
+void DisconnectTelephonyOperation::disconnectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+{
+ TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+ TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=fromSources.begin();
+ i != fromSources.end(); ++i)
+ {
+ (*i)->removeSinks(sinksToRemove);
+ }
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index a4e2c29..042fdce 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -117,6 +117,44 @@ private:
typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
+class ConnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ ConnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToConnect,
+ const std::vector<AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx>& ignoreList,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+private:
+ void connectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession,
+ const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+ bool inIgnoreList(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& session);
+
+ AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToConnect;
+ std::vector<AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx> mIgnoreList;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<ConnectTelephonyOperation> ConnectTelephonyOperationPtr;
+
+class DisconnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ DisconnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToDisconnect,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+private:
+ void disconnectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession,
+ const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+ AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToDisconnect;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<DisconnectTelephonyOperation> DisconnectTelephonyOperationPtr;
+
class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
{
public:
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index b7efa3f..0476241 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -1351,6 +1351,105 @@ public:
}
}
+ void telephonyConnectTest()
+ {
+ try
+ {
+ IceEnvironment testEnv(env()->properties());
+ try
+ {
+ Ice::ObjectAdapterPtr testAdapter = testEnv.communicator()->createObjectAdapter("TestUtilAdapter");
+ testAdapter->activate();
+ BridgeManagerListenerIPtr servant = new BridgeManagerListenerI;
+ AsteriskSCF::SessionCommunications::V1::BridgeManagerListenerPrx listenerPrx;
+ addServant(listenerPrx, testAdapter, servant, testEnv.strToIdent(IceUtil::generateUUID()));
+
+ BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
+ BOOST_CHECK(mgrPrx);
+ mgrPrx->addListener(listenerPrx);
+
+ SessionSeq sessions;
+ TestChannelWrapper channel(env()->properties());
+
+ SessionPrx a = channel.getSession("311");
+ TelephonySessionPrx ta = TelephonySessionPrx::checkedCast(a);
+ BOOST_CHECK(ta != 0);
+
+ TelephonyEventSourceSeq aSources = ta->getSources();
+ TelephonyEventSinkSeq aSinks = ta->getSinks();
+ BOOST_CHECK(aSources.size() == 1);
+ BOOST_CHECK(aSinks.size() == 1);
+ TelephonyEventSinkSeq aConnected = aSources[0]->getSinks();
+ BOOST_CHECK(aConnected.size() == 0);
+
+ SessionPrx b = channel.getSession("312");
+ TelephonySessionPrx tb = TelephonySessionPrx::checkedCast(b);
+ BOOST_CHECK(tb != 0);
+
+ TelephonyEventSourceSeq bSources = tb->getSources();
+ TelephonyEventSinkSeq bSinks = tb->getSinks();
+ BOOST_CHECK(bSources.size() == 1);
+ BOOST_CHECK(bSinks.size() == 1);
+ TelephonyEventSinkSeq bConnected = bSources[0]->getSinks();
+ BOOST_CHECK(bConnected.size() == 0);
+
+ sessions.push_back(a);
+ sessions.push_back(b);
+ BridgePrx bridge(mgrPrx->createBridge(sessions, 0));
+
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
+
+ // Now that we're bridged, insure the sinks are connected as expected.
+ aConnected = aSources[0]->getSinks();
+ BOOST_CHECK(aConnected.size() == 1);
+ BOOST_CHECK(aConnected[0]->ice_getIdentity() == bSinks[0]->ice_getIdentity());
+
+ bConnected = bSources[0]->getSinks();
+ BOOST_CHECK(bConnected.size() == 1);
+ BOOST_CHECK(bConnected[0]->ice_getIdentity() == aSinks[0]->ice_getIdentity());
+
+ SessionPrx c = channel.getSession("314");
+ TelephonySessionPrx tc = TelephonySessionPrx::checkedCast(c);
+ BOOST_CHECK(tc != 0);
+
+ sessions.clear();
+ sessions.push_back(c);
+ TelephonyEventSourceSeq cSources = tc->getSources();
+ TelephonyEventSinkSeq cSinks = tc->getSinks();
+
+ bridge->replaceSession(b, sessions);
+
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
+
+ // We've replaced b with c. Make sure all is well, including disconnecting b.
+ aConnected = aSources[0]->getSinks();
+ BOOST_CHECK(aConnected.size() == 1);
+ BOOST_CHECK(aConnected[0]->ice_getIdentity() == cSinks[0]->ice_getIdentity());
+
+ TelephonyEventSinkSeq cConnected = cSources[0]->getSinks();
+ BOOST_CHECK(cConnected.size() == 1);
+ BOOST_CHECK(cConnected[0]->ice_getIdentity() == aSinks[0]->ice_getIdentity());
+
+ bConnected = bSources[0]->getSinks();
+ BOOST_CHECK(bConnected.size() == 0);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ std::ostringstream msg;
+ msg << "Unexpected Ice exception " << ex.what();
+ BOOST_FAIL(msg.str());
+ }
+ catch (...)
+ {
+ BOOST_FAIL("Unexpected exception");
+ }
+ }
+ catch (...)
+ {
+ BOOST_FAIL("Unexpected exception");
+ }
+ }
+
private:
TestEnvironmentPtr mTestEnvironment;
};
@@ -1361,6 +1460,8 @@ bool init_unit_test()
{
boost::shared_ptr<BridgeTester> bridgeTester(new BridgeTester(globalTestEnvironment));
framework::master_test_suite().
+ add(BOOST_TEST_CASE(boost::bind(&BridgeTester::telephonyConnectTest, bridgeTester)));
+ framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::createEmptyBridge, bridgeTester)));
framework::master_test_suite().
add(BOOST_TEST_CASE(boost::bind(&BridgeTester::simpleBridgingTest, bridgeTester)));
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list