[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Sun Oct 2 17:54:37 CDT 2011


branch "master" has been updated
       via  1e99ad99de150e86169a828a6341717dad05e110 (commit)
      from  7d67896288e9087ba634e48430f0417f3e3e1526 (commit)

Summary of changes:
 src/BridgeImpl.cpp        |  178 +++++++++++++++++++++++++++++++++++++++++++++
 src/SessionOperations.cpp |  103 ++++++++++++++++++++++++++
 src/SessionOperations.h   |   38 ++++++++++
 test/TestBridging.cpp     |  101 +++++++++++++++++++++++++
 4 files changed, 420 insertions(+), 0 deletions(-)


- Log -----------------------------------------------------------------
commit 1e99ad99de150e86169a828a6341717dad05e110
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Sun Oct 2 17:53:26 2011 -0500

    Added connect/disconnect of telephony event sources and sinks.

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 36ea2dd..0c94a05 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -973,6 +973,175 @@ private:
     BridgeImplPtr mBridge;
 };
 
+class ConnectTelephonyEventsTask: public QueuedTask
+{
+public:
+    ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+                               const SessionSeq& newSessions,
+                               const Logger& logger) 
+       : QueuedTask("ConnectTelephonyEventsTask"), 
+         mBridge(bridge), 
+         mNewSessions(newSessions), 
+         mLogger(logger) 
+    { 
+    }
+
+protected:
+    bool executeImpl()
+    {
+        vector<TelephonySessionPrx> ignoreList;
+
+        // Assumption: All the sessions we need to connect to are already 
+        // part of the bridge's session collection. 
+        for (SessionSeq::iterator i = mNewSessions.begin(); i != mNewSessions.end(); ++i)
+        {
+            TelephonySessionPrx telephonySession;
+            try
+            {
+                telephonySession = TelephonySessionPrx::checkedCast(*i);
+            }
+            catch(...)
+            {
+                continue;
+            }
+
+            if (telephonySession == 0)
+            {
+                continue;
+            }
+
+            ConnectTelephonyOperation op(telephonySession, ignoreList, mLogger);
+            mBridge->getSessions()->visitSessions(op);
+
+            // Since this session is now connected to all others in the bridge (including
+            // those in the mNewSessions set) we don't need to connect this one again.
+            ignoreList.push_back(telephonySession);
+        }
+        return true;
+    }
+
+private:
+    BridgeImplPtr mBridge;
+    SessionSeq mNewSessions;
+    Logger mLogger;
+};
+
+class DisconnectTelephonyEventsTask: public QueuedTask
+{
+public:
+    DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+                               const SessionSeq& disconnectingSessions,
+                               const Logger& logger) 
+       : QueuedTask("DisconnectTelephonyEventsTask"), 
+         mBridge(bridge), 
+         mDisconnectingSessions(disconnectingSessions), 
+         mLogger(logger) 
+    { 
+    }
+
+protected:
+    bool executeImpl()
+    {
+        for (SessionSeq::iterator i = mDisconnectingSessions.begin(); i != mDisconnectingSessions.end(); ++i)
+        {
+            TelephonySessionPrx telephonySession;
+            try
+            {
+                telephonySession = TelephonySessionPrx::checkedCast(*i);
+            }
+            catch(...)
+            {
+                continue;
+            }
+
+            if (telephonySession == 0)
+            {
+                continue;
+            }
+
+            DisconnectTelephonyOperation op(telephonySession, mLogger);
+            mBridge->getSessions()->visitSessions(op);
+        }
+
+        /**
+         * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
+         * and just use a visitor. But when a session is removed, AMI operations are involved. We use an 
+         * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe. 
+         */
+        disconnectMembers();
+
+        return true;
+    }
+
+    /**
+     * Disconnect the members of mDisconnectingSessions from each other.
+     */
+    void disconnectMembers()
+    {
+        for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+            i != mDisconnectingSessions.end(); ++i)
+        {
+            TelephonySessionPrx session1 = TelephonySessionPrx::checkedCast(*i);
+            if (session1 == 0)
+            {
+                // If not a telephony session, nothing to do. 
+                continue;
+            }
+
+            disconnectMembersFrom(session1);
+        }
+    }
+
+    void disconnectMembersFrom(TelephonySessionPrx session1)
+    {
+        for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+            i != mDisconnectingSessions.end(); ++i)
+        {
+            if (session1->ice_getIdentity() == (*i)->ice_getIdentity())
+            {
+                // It's not connected to itself.
+                continue;
+            }
+
+            TelephonySessionPrx session2 = TelephonySessionPrx::checkedCast(*i);
+            if (session2== 0)
+            {
+                // If not a telephony session, nothing to do. 
+                continue;
+            }
+
+            disconnectSinks(session1, session2);
+            disconnectSinks(session2, session1);
+        }
+    }
+
+    void disconnectSinks
+        (const TelephonySessionPrx& sourceSession, 
+         const TelephonySessionPrx& sinkSession)
+    {
+        if (sourceSession->ice_getIdentity() == sinkSession->ice_getIdentity())
+        {
+            // Not connected to ourselves.
+            return;
+        }
+
+        TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+        TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+        for(TelephonyEventSourceSeq::iterator i=fromSources.begin();   
+            i != fromSources.end(); ++i)
+        {
+                (*i)->removeSinks(sinksToRemove);
+        }
+    }
+
+private:
+    BridgeImplPtr mBridge;
+    SessionSeq mDisconnectingSessions;
+    Logger mLogger;
+};
+
+
 } // End of anonymous namespace
 
 BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter, 
@@ -1082,6 +1251,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
 	tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
         tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
         tasks.push_back(new SetupMedia(this));
+        tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
         tasks.push_back(new UpdateTask(this));
         ExecutorPtr executor(new Executor(tasks, mLogger));
         executor->start();
@@ -1141,6 +1311,7 @@ void BridgeImpl::removeSessions_async(const AMD_Bridge_removeSessionsPtr& callba
         }
         tasks.push_back(new RemoveSessionsNotify(mListeners, removed, cookies));
         tasks.push_back(new GenericAMDCallback<AMD_Bridge_removeSessionsPtr>(callback, removed));
+        tasks.push_back(new DisconnectTelephonyEventsTask(this, sessions, mLogger));
         tasks.push_back(new CheckShutdown(this, mPrx));
         ExecutorPtr runner(new Executor(tasks, mLogger));
         runner->start();
@@ -1342,6 +1513,10 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
             boost::shared_lock<boost::shared_mutex> lock(mLock);
             cookies = getCookies();
         }
+
+        SessionSeq removedSessions;
+        removedSessions.push_back(sessionToReplace);
+
         tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
         tasks.push_back(new UnplugMedia(this));
         tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
@@ -1349,6 +1524,8 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
 	tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
         tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
         tasks.push_back(new SetupMedia(this));
+        tasks.push_back(new ConnectTelephonyEventsTask(this, newSessions, mLogger));
+        tasks.push_back(new DisconnectTelephonyEventsTask(this, removedSessions, mLogger));
         tasks.push_back(new UpdateTask(this));
         ExecutorPtr executor(new Executor(tasks, mLogger));
         executor->start();
@@ -1575,6 +1752,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
     tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
     tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
     tasks.push_back(new SetupMedia(this));
+    tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
     tasks.push_back(new UpdateTask(this));
 }
 
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index e6bf5f8..38c0184 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -233,3 +233,106 @@ void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
 void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
 {
 }
+
+ConnectTelephonyOperation::ConnectTelephonyOperation(const TelephonySessionPrx& sessionToConnect,
+                                                     const vector<TelephonySessionPrx>& ignoreList,
+                                                     const Logger& logger)
+                                                       : mSessionToConnect(sessionToConnect),
+                                                         mIgnoreList(ignoreList),
+                                                         mLogger(logger)
+{
+}
+
+void ConnectTelephonyOperation::operator()(const SessionWrapperPtr& session)
+{
+    if (session->getSession()->ice_getIdentity() == mSessionToConnect->ice_getIdentity())
+    {
+        // Let's not connect a session to itself. . 
+        return;
+    }
+
+    TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(session->getSession());
+    if (visitedTelephonySession == 0)
+    {
+        // If the session being visited isn't telephony, nothing to do. 
+        return;
+    }
+
+    if (inIgnoreList(visitedTelephonySession))
+    {
+        return;
+    }
+
+    connectSinks(mSessionToConnect, visitedTelephonySession);
+    connectSinks(visitedTelephonySession, mSessionToConnect);
+
+}
+
+bool ConnectTelephonyOperation::inIgnoreList(const TelephonySessionPrx& session)
+{
+    for(vector<TelephonySessionPrx>::iterator i=mIgnoreList.begin();   
+        i != mIgnoreList.end(); ++i)
+    {
+        if (session->ice_getIdentity() == (*i)->ice_getIdentity())
+        {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+void ConnectTelephonyOperation::connectSinks
+    (const TelephonySessionPrx& sourceSession, 
+     const TelephonySessionPrx& sinkSession)
+{
+    TelephonyEventSinkSeq sinksToAdd = sinkSession->getSinks();
+    TelephonyEventSourceSeq toSources = sourceSession->getSources();
+
+    for(TelephonyEventSourceSeq::iterator i=toSources.begin();   
+        i != toSources.end(); ++i)
+    {
+            (*i)->addSinks(sinksToAdd);
+    }
+}
+
+DisconnectTelephonyOperation::DisconnectTelephonyOperation(const TelephonySessionPrx& sessionToDisconnect,
+                                                           const Logger& logger)
+                                                       : mSessionToDisconnect(sessionToDisconnect),
+                                                         mLogger(logger)
+{
+}
+
+void DisconnectTelephonyOperation::operator()(const SessionWrapperPtr& visitedSession)
+{
+    if (visitedSession->getSession()->ice_getIdentity() == mSessionToDisconnect->ice_getIdentity())
+    {
+        // Not connected to ourselves.
+        return;
+    }
+
+    TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(visitedSession->getSession());
+    if (visitedTelephonySession == 0)
+    {
+        // If the session being visited isn't telephony, nothing to do. 
+        return;
+    }
+
+    disconnectSinks(mSessionToDisconnect, visitedTelephonySession);
+    disconnectSinks(visitedTelephonySession, mSessionToDisconnect);
+
+}
+
+void DisconnectTelephonyOperation::disconnectSinks
+    (const TelephonySessionPrx& sourceSession, 
+     const TelephonySessionPrx& sinkSession)
+{
+    TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+    TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+    for(TelephonyEventSourceSeq::iterator i=fromSources.begin();   
+        i != fromSources.end(); ++i)
+    {
+            (*i)->removeSinks(sinksToRemove);
+    }
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index a4e2c29..042fdce 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -117,6 +117,44 @@ private:
 
 typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
 
+class ConnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+    ConnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToConnect,
+                              const std::vector<AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx>& ignoreList,
+                              const AsteriskSCF::System::Logging::Logger& logger);
+
+    void operator()(const SessionWrapperPtr& session);
+
+private:
+    void connectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession, 
+                      const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+    bool inIgnoreList(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& session);
+
+    AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToConnect;
+    std::vector<AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx> mIgnoreList;
+    AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<ConnectTelephonyOperation> ConnectTelephonyOperationPtr;
+
+class DisconnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+    DisconnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToDisconnect,
+                                 const AsteriskSCF::System::Logging::Logger& logger);
+
+    void operator()(const SessionWrapperPtr& session);
+
+private:
+    void disconnectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession, 
+                         const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+    AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToDisconnect;
+    AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<DisconnectTelephonyOperation> DisconnectTelephonyOperationPtr;
+
 class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
 {
 public:
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index b7efa3f..0476241 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -1351,6 +1351,105 @@ public:
         }
     }
 
+    void telephonyConnectTest()
+    {
+        try
+        {
+            IceEnvironment testEnv(env()->properties());
+            try
+            {
+                Ice::ObjectAdapterPtr testAdapter =  testEnv.communicator()->createObjectAdapter("TestUtilAdapter");
+                testAdapter->activate();
+                BridgeManagerListenerIPtr servant = new BridgeManagerListenerI;
+                AsteriskSCF::SessionCommunications::V1::BridgeManagerListenerPrx listenerPrx;
+                addServant(listenerPrx, testAdapter, servant, testEnv.strToIdent(IceUtil::generateUUID()));
+
+                BridgeManagerPrx mgrPrx = env()->primaryBridgeManager();
+                BOOST_CHECK(mgrPrx);
+                mgrPrx->addListener(listenerPrx);
+
+                SessionSeq sessions;
+                TestChannelWrapper channel(env()->properties());
+
+                SessionPrx a = channel.getSession("311");
+                TelephonySessionPrx ta = TelephonySessionPrx::checkedCast(a);
+                BOOST_CHECK(ta != 0);
+
+                TelephonyEventSourceSeq aSources = ta->getSources();
+                TelephonyEventSinkSeq aSinks = ta->getSinks();
+                BOOST_CHECK(aSources.size() == 1);
+                BOOST_CHECK(aSinks.size() == 1);
+                TelephonyEventSinkSeq aConnected = aSources[0]->getSinks();
+                BOOST_CHECK(aConnected.size() == 0);
+
+                SessionPrx b = channel.getSession("312");
+                TelephonySessionPrx tb = TelephonySessionPrx::checkedCast(b);
+                BOOST_CHECK(tb != 0);
+
+                TelephonyEventSourceSeq bSources = tb->getSources();
+                TelephonyEventSinkSeq bSinks = tb->getSinks();
+                BOOST_CHECK(bSources.size() == 1);
+                BOOST_CHECK(bSinks.size() == 1);
+                TelephonyEventSinkSeq bConnected = bSources[0]->getSinks();
+                BOOST_CHECK(bConnected.size() == 0);
+
+                sessions.push_back(a);
+                sessions.push_back(b);
+                BridgePrx bridge(mgrPrx->createBridge(sessions, 0));
+
+                IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
+
+                // Now that we're bridged, insure the sinks are connected as expected.
+                aConnected = aSources[0]->getSinks();
+                BOOST_CHECK(aConnected.size() == 1);
+                BOOST_CHECK(aConnected[0]->ice_getIdentity() == bSinks[0]->ice_getIdentity());
+
+                bConnected = bSources[0]->getSinks();
+                BOOST_CHECK(bConnected.size() == 1);
+                BOOST_CHECK(bConnected[0]->ice_getIdentity() == aSinks[0]->ice_getIdentity());
+
+                SessionPrx c = channel.getSession("314");
+                TelephonySessionPrx tc = TelephonySessionPrx::checkedCast(c);
+                BOOST_CHECK(tc != 0);
+
+                sessions.clear();
+                sessions.push_back(c);
+                TelephonyEventSourceSeq cSources = tc->getSources();
+                TelephonyEventSinkSeq cSinks = tc->getSinks();
+
+                bridge->replaceSession(b, sessions);
+
+                IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
+
+                // We've replaced b with c. Make sure all is well, including disconnecting b. 
+                aConnected = aSources[0]->getSinks();
+                BOOST_CHECK(aConnected.size() == 1);
+                BOOST_CHECK(aConnected[0]->ice_getIdentity() == cSinks[0]->ice_getIdentity());
+
+                TelephonyEventSinkSeq cConnected = cSources[0]->getSinks();
+                BOOST_CHECK(cConnected.size() == 1);
+                BOOST_CHECK(cConnected[0]->ice_getIdentity() == aSinks[0]->ice_getIdentity());
+
+                bConnected = bSources[0]->getSinks();
+                BOOST_CHECK(bConnected.size() == 0);
+            }
+            catch (const Ice::Exception& ex)
+            {
+                std::ostringstream msg;
+                msg << "Unexpected Ice exception " << ex.what();
+                BOOST_FAIL(msg.str());
+            }
+            catch (...)
+            {
+                BOOST_FAIL("Unexpected exception");
+            }
+        }
+        catch (...)
+        {
+            BOOST_FAIL("Unexpected exception");
+        }
+    }
+
 private:
     TestEnvironmentPtr mTestEnvironment;
 };
@@ -1361,6 +1460,8 @@ bool init_unit_test()
 {
     boost::shared_ptr<BridgeTester> bridgeTester(new BridgeTester(globalTestEnvironment));
     framework::master_test_suite().
+        add(BOOST_TEST_CASE(boost::bind(&BridgeTester::telephonyConnectTest, bridgeTester)));
+    framework::master_test_suite().
         add(BOOST_TEST_CASE(boost::bind(&BridgeTester::createEmptyBridge, bridgeTester)));
     framework::master_test_suite().
         add(BOOST_TEST_CASE(boost::bind(&BridgeTester::simpleBridgingTest, bridgeTester)));

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list