[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed May 25 09:41:36 CDT 2011


branch "master" has been updated
       via  9f78b40761e71cccf0e0e85e0f89235a2f77583f (commit)
      from  231250c599d86a6bec9c0cc6038ed2e33a9feb3f (commit)

Summary of changes:
 src/BridgeImpl.cpp        |    7 +++-
 src/SessionListener.cpp   |   96 ++++++++++++++++++++++++++++++++++++++++-----
 src/SessionOperations.cpp |   12 +++--
 src/SessionOperations.h   |   10 +++-
 src/SessionWrapper.cpp    |   81 +++++++++++++++++++++++++-------------
 src/SessionWrapper.h      |    2 +-
 test/TestBridging.cpp     |    2 +
 7 files changed, 163 insertions(+), 47 deletions(-)


- Log -----------------------------------------------------------------
commit 9f78b40761e71cccf0e0e85e0f89235a2f77583f
Author: Brent Eagles <beagles at digium.com>
Date:   Wed May 25 12:09:35 2011 -0230

    Fixing bugs with indications.
     - Added indication of translations and passes through indications that do not require further action on
       the bridging side.
     - Fixed terminal session wrapper operation so that it would not repeat work if the wrapper was already
       set to the disconnected state.
     - Changed order of operations on shutdown so media would be disconnected before the stop indication was already
       passed along

diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 15dd93a..0cf2807 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -651,7 +651,12 @@ void BridgeImpl::shutdown(const Ice::Current& current)
     pushUpdate(update);
     update = 0;
     {
-        ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+        //
+        // Currently the slice defines the response "Normal Clearing" as the default
+        // value so we shouldn't need to set anything here.
+        //
+        ResponseCodePtr responseCode = new ResponseCode;
+        ShutdownSessionOperation shutdownOp(mSessionListenerPrx, responseCode, mLogger);
         mSessions->visitSessions(shutdownOp);
         mListeners->stopped();
         boost::unique_lock<boost::shared_mutex> lock(mLock);
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 585f9f4..7aee4df 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -28,6 +28,77 @@ using namespace std;
 namespace
 {
 
+template <class FromT, class ToT>
+void indicationCopy(const FromT&, const ToT&)
+{
+    //
+    // NO-OP.
+    //
+}
+
+template <>
+void indicationCopy(const ProgressingIndicationPtr& source, const ProgressIndicationPtr& destination)
+{
+    destination->response = source->response;
+}
+
+template <class FromT, class ToT> 
+ToT translateImpl(const IndicationPtr& f)
+{
+    FromT candidate = FromT::dynamicCast(f);
+    if (candidate)
+    {
+        typedef typename ToT::element_type Tval;
+        ToT result = new Tval;
+        indicationCopy(candidate, result);
+        return result;
+    }
+    return 0;
+}
+
+IndicationPtr translate(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& source)
+{
+    {
+        FlashIndicationPtr indication = translateImpl<FlashedIndicationPtr, FlashIndicationPtr>(source);
+        if (indication)
+        {
+            return indication;
+        }
+    }
+    {
+        HoldIndicationPtr indication = translateImpl<HeldIndicationPtr, HoldIndicationPtr>(source);
+        if (indication)
+        {
+            return indication;
+        }
+    }
+    {
+        UnholdIndicationPtr indication = translateImpl<UnheldIndicationPtr, UnholdIndicationPtr>(source);
+        if (indication)
+        {
+            return indication;
+        }
+    }
+    {
+        RingIndicationPtr indication = translateImpl<RingingIndicationPtr, RingIndicationPtr>(source);
+        if (indication)
+        {
+            return indication;
+        }
+    }
+    {
+        ProgressIndicationPtr indication = translateImpl<ProgressingIndicationPtr, ProgressIndicationPtr>(source);
+        if (indication)
+        {
+            return indication;
+        }
+    }
+    //
+    // TODO: This probably shoudn't happen.
+    //
+    return source;
+}
+
 //
 // For events that require modification to the bridge, we use helper methods on the bridge itself.
 // For events result in distribution to the bridge sessions, we copy the current sessions and
@@ -75,14 +146,6 @@ public:
 		throw;
 	    }
 	}
-	else if ((ringing = AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr::dynamicCast(indication)))
-	{
-	    //
-	    // TODO: Who gets the ring notifications will likely depend on configuration, etc.
-	    //
-	    RingSessionOperation ringer(source, mLogger);
-	    mSessions->visitSessions(ringer);
-	}
 	else if ((stopped = AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr::dynamicCast(indication)))
 	{
 	    string proxyString = source->ice_toString();
@@ -114,7 +177,6 @@ public:
             //
             // This returns only the active sessions.
             //
-            SessionSeq currentSessions = mSessions->getSessionSeq();
 	    
 	    //
 	    // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
@@ -122,6 +184,9 @@ public:
 	    //
 	    session->shutdown(mListenerPrx, ResponseCodePtr());
 
+            ShutdownSessionOperation shutdownOp(mListenerPrx, stopped->response, mLogger);
+            mSessions->visitSessions(shutdownOp);
+
             //
             // Ideally, we'd wait for an affirmative response from the session being removed from the bridge
             // before doing this next part. 
@@ -130,11 +195,13 @@ public:
             {
                 try
                 {
+                    SessionSeq currentSessions = mSessions->getSessionSeq();
+
                     //
                     // TODO: Should be determined by policy. 2 is the proper upper limit because 
                     // the session in question will not have been removed from the bridge yet.
                     //
-                    if (currentSessions.size() <= 2)
+                    if (currentSessions.size() < 2)
                     {
                         mBridgePrx->shutdown();
                     }
@@ -145,6 +212,15 @@ public:
                 }
             }
 	}
+        else
+        {
+            IndicationPtr commandIndication = translate(indication);
+            //
+            // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+            //
+            RelayIndication relayer(source, mLogger, commandIndication, false);
+            mSessions->visitSessions(relayer);
+        }
     }
 
     void onActivate(const BridgePrx& bridgePrx, const SessionListenerPrx& listenerPrx)
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 56a5c5a..0ef489a 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -54,23 +54,25 @@ void ShutdownSessionOperation::operator()(const SessionWrapperPtr& wrapper)
     wrapper->shutdown(mListener, mResponse);
 }
 
-RingSessionOperation::RingSessionOperation(const SessionPrx& exclude, const Logger& logger) :
+RelayIndication::RelayIndication(const SessionPrx& exclude, const Logger& logger, const IndicationPtr& indication, bool includeConnected) :
     mExclude(exclude->ice_getIdentity()),
-    mLogger(logger)
+    mLogger(logger),
+    mIndication(indication),
+    mIncludeConnected(includeConnected)
 {
 }
 
-void RingSessionOperation::operator()(const SessionWrapperPtr& session)
+void RelayIndication::operator()(const SessionWrapperPtr& session)
 {
     SessionPrx s = session->getSession();
-    if (s->ice_getIdentity() != mExclude)
+    if (s->ice_getIdentity() != mExclude && (mIncludeConnected || !session->isConnected()))
     {
         try
         {
             //
             // TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
             //
-	    s->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
+	    s->indicate(mIndication);
         }
         catch (const Ice::ObjectNotExistException& ex)
         {
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index 1976059..cdf4b82 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -53,17 +53,21 @@ private:
     AsteriskSCF::System::Logging::Logger mLogger;
 };
 
-class RingSessionOperation : public std::unary_function<SessionWrapperPtr, void>
+class RelayIndication : public std::unary_function<SessionWrapperPtr, void>
 {
 public:
-    RingSessionOperation(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude,
-            const AsteriskSCF::System::Logging::Logger& logger);
+    RelayIndication(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude,
+            const AsteriskSCF::System::Logging::Logger& logger,
+            const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
+            bool includeConnected);
 
     void operator()(const SessionWrapperPtr& session);
 
 private:
     Ice::Identity mExclude;
     AsteriskSCF::System::Logging::Logger mLogger;
+    AsteriskSCF::SessionCommunications::V1::IndicationPtr mIndication;
+    bool mIncludeConnected;
 };
 
 class IfStateCriteria
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index c447486..883c614 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -18,6 +18,7 @@
 #include <Ice/Ice.h>
 #include <AsteriskSCF/logger.h>
 #include "ServiceUtil.h"
+#include <set>
 
 using namespace AsteriskSCF::System::Logging;
 using namespace AsteriskSCF::SessionCommunications::V1;
@@ -28,6 +29,9 @@ using namespace std;
 
 namespace
 {
+
+typedef set<BridgedSessionState> StateSet;
+
 //
 // NOTE: This object is used in support of making connect() calls on sessions via AMI. It's a fair candidate for a
 // singleton as you could pass the session as a cookie to the AMI call. However, it is desirable to have some logging,
@@ -124,7 +128,6 @@ protected:
 
     void failed(const Ice::Exception& ex)
     {
-
         try
         {
             ex.ice_throw();
@@ -178,7 +181,6 @@ public:
 protected:
     bool executeImpl()
     {
-
         mSession->getSession()->begin_stop(mCode,
             newCallback_Session_stop(this, &SessionStopTask::removed,
                     &SessionStopTask::failed));
@@ -241,6 +243,38 @@ private:
     AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
 };
 
+class SetStateFromTask : public QueuedTask
+{
+public:
+    SetStateFromTask(const SessionWrapperPtr& session,
+            const AsteriskSCF::Bridge::V1::BridgedSessionState newState,
+            const StateSet& startStates) :
+        QueuedTask("SetStateFromTask"),
+        mSession(session),
+        mState(newState),
+        mStartStates(startStates)
+    {
+    }
+
+protected:
+    bool executeImpl()
+    {
+        BridgedSessionState oldState = mSession->setState(mState);
+        if (mStartStates.find(oldState) == mStartStates.end())
+        {
+            mListener->failed();
+            return false;
+        }
+        return true;
+    }
+
+private:
+    SessionWrapperPtr mSession;
+    AsteriskSCF::Bridge::V1::BridgedSessionState mState;
+    AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
+    StateSet mStartStates;
+};
+
 class ShutdownMediaTask : public QueuedTask
 {
 public:
@@ -250,25 +284,16 @@ public:
     }
     
 protected:
+    //
+    // NOTE: This has been changed from async to sync for the time being.. the unplugMedia
+    // call is inherently async, so we can just go on.
+    //
     bool executeImpl()
     {
         mSession->unplugMedia();
-        return false;
-    }
-
-    void done()
-    {
-        mListener->succeeded();
+        return true;
     }
 
-    void failed(const Ice::Exception&)
-    {
-        //
-        // TODO: Log exception.
-        //
-        mListener->failed();
-    }
-    
 private:
     SessionWrapperPtr mSession;
 };
@@ -306,26 +331,28 @@ QueuedTasks createShutdownTasks(const SessionWrapperPtr& session, const SessionL
         const ResponseCodePtr& code)
 {
     //
-    // Tasks are a queued, so they go in order that they will be processed.
+    // We use this task instead of changing states because it will prevent the
+    // queue from continuing if the session is already connected or done.
     //
+    StateSet statesToContinueOn;
+    statesToContinueOn.insert(::AsteriskSCF::Bridge::V1::Added);
+    statesToContinueOn.insert(::AsteriskSCF::Bridge::V1::Connected);
+
     QueuedTasks tasks;
-    tasks.push_back(new SetStateTask(session, Disconnected));
+    tasks.push_back(new SetStateFromTask(session, ::AsteriskSCF::Bridge::V1::Disconnected, statesToContinueOn));
     tasks.push_back(new RemoveBridgeTask(session, listener));
     if (code)
     {
+        tasks.push_back(new ShutdownMediaTask(session));
         tasks.push_back(new SessionStopTask(session, code));
     }
-    //
-    // TODO: These two tasks should be reversed really.
-    //
-    tasks.push_back(new ShutdownMediaTask(session));
     return tasks;
 }
 
 QueuedTasks createSetupTasks(const SessionWrapperPtr& session)
 {
     QueuedTasks tasks;
-    tasks.push_back(new SetStateTask(session, Connected));
+    tasks.push_back(new SetStateTask(session, ::AsteriskSCF::Bridge::V1::Connected));
     tasks.push_back(new ConnectMediaTask(session));
     return tasks;
 }
@@ -532,19 +559,19 @@ void SessionWrapper::shutdown(const SessionListenerPrx& listener, const Response
     // shutdownRunner->start();
 }
 
-void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
+AsteriskSCF::Bridge::V1::BridgedSessionState SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
 {
     mLogger(Debug) << FUNLOG << ": updating state " << mId;
     BridgedSessionPtr copyOfNewState;
+    AsteriskSCF::Bridge::V1::BridgedSessionState oldState;
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);
-        //
-        // TODO:
-        //
+        oldState = mSession->currentState;
         mSession->currentState = newState;
         copyOfNewState = createUpdate();
     }
     pushUpdate(copyOfNewState);
+    return oldState;
 }
 
 void SessionWrapper::unplugMedia()
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 0c98112..e0a8835 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -127,7 +127,7 @@ public:
     //
     // TODO: Refactor so these methods don't need to be exposed.
     //
-    void setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
+    AsteriskSCF::Bridge::V1::BridgedSessionState setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
 
     /**
      * Disconnection helper.
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index a2ad429..f5569e9 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -420,6 +420,7 @@ public:
 
                 BOOST_CHECK(servant->createCalls() == 1);
                 
+                IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
                 channel.commands()->getlog(idA, log);
                 bool findStop = find(log, "stop");
                 if (!findStop)
@@ -615,6 +616,7 @@ public:
                 BOOST_CHECK(bridgeListener->addedCount() == 2); 
                 bridge->shutdown();
 
+                IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
                 BOOST_CHECK(servant->createCalls() == 1);
                 channel.commands()->getlog(idA, log);
                 bool findStop = find(log, "stop");

-----------------------------------------------------------------------


-- 
asterisk-scf/release/bridging.git



More information about the asterisk-scf-commits mailing list