[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed May 25 09:41:36 CDT 2011
branch "master" has been updated
via 9f78b40761e71cccf0e0e85e0f89235a2f77583f (commit)
from 231250c599d86a6bec9c0cc6038ed2e33a9feb3f (commit)
Summary of changes:
src/BridgeImpl.cpp | 7 +++-
src/SessionListener.cpp | 96 ++++++++++++++++++++++++++++++++++++++++-----
src/SessionOperations.cpp | 12 +++--
src/SessionOperations.h | 10 +++-
src/SessionWrapper.cpp | 81 +++++++++++++++++++++++++-------------
src/SessionWrapper.h | 2 +-
test/TestBridging.cpp | 2 +
7 files changed, 163 insertions(+), 47 deletions(-)
- Log -----------------------------------------------------------------
commit 9f78b40761e71cccf0e0e85e0f89235a2f77583f
Author: Brent Eagles <beagles at digium.com>
Date: Wed May 25 12:09:35 2011 -0230
Fixing bugs with indications.
- Added indication of translations and passes through indications that do not require further action on
the bridging side.
- Fixed terminal session wrapper operation so that it would not repeat work if the wrapper was already
set to the disconnected state.
- Changed order of operations on shutdown so media would be disconnected before the stop indication was already
passed along
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 15dd93a..0cf2807 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -651,7 +651,12 @@ void BridgeImpl::shutdown(const Ice::Current& current)
pushUpdate(update);
update = 0;
{
- ShutdownSessionOperation shutdownOp(mSessionListenerPrx, new ResponseCode, mLogger);
+ //
+ // Currently the slice defines the response "Normal Clearing" as the default
+ // value so we shouldn't need to set anything here.
+ //
+ ResponseCodePtr responseCode = new ResponseCode;
+ ShutdownSessionOperation shutdownOp(mSessionListenerPrx, responseCode, mLogger);
mSessions->visitSessions(shutdownOp);
mListeners->stopped();
boost::unique_lock<boost::shared_mutex> lock(mLock);
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 585f9f4..7aee4df 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -28,6 +28,77 @@ using namespace std;
namespace
{
+template <class FromT, class ToT>
+void indicationCopy(const FromT&, const ToT&)
+{
+ //
+ // NO-OP.
+ //
+}
+
+template <>
+void indicationCopy(const ProgressingIndicationPtr& source, const ProgressIndicationPtr& destination)
+{
+ destination->response = source->response;
+}
+
+template <class FromT, class ToT>
+ToT translateImpl(const IndicationPtr& f)
+{
+ FromT candidate = FromT::dynamicCast(f);
+ if (candidate)
+ {
+ typedef typename ToT::element_type Tval;
+ ToT result = new Tval;
+ indicationCopy(candidate, result);
+ return result;
+ }
+ return 0;
+}
+
+IndicationPtr translate(const AsteriskSCF::SessionCommunications::V1::IndicationPtr& source)
+{
+ {
+ FlashIndicationPtr indication = translateImpl<FlashedIndicationPtr, FlashIndicationPtr>(source);
+ if (indication)
+ {
+ return indication;
+ }
+ }
+ {
+ HoldIndicationPtr indication = translateImpl<HeldIndicationPtr, HoldIndicationPtr>(source);
+ if (indication)
+ {
+ return indication;
+ }
+ }
+ {
+ UnholdIndicationPtr indication = translateImpl<UnheldIndicationPtr, UnholdIndicationPtr>(source);
+ if (indication)
+ {
+ return indication;
+ }
+ }
+ {
+ RingIndicationPtr indication = translateImpl<RingingIndicationPtr, RingIndicationPtr>(source);
+ if (indication)
+ {
+ return indication;
+ }
+ }
+ {
+ ProgressIndicationPtr indication = translateImpl<ProgressingIndicationPtr, ProgressIndicationPtr>(source);
+ if (indication)
+ {
+ return indication;
+ }
+ }
+ //
+ // TODO: This probably shoudn't happen.
+ //
+ return source;
+}
+
//
// For events that require modification to the bridge, we use helper methods on the bridge itself.
// For events result in distribution to the bridge sessions, we copy the current sessions and
@@ -75,14 +146,6 @@ public:
throw;
}
}
- else if ((ringing = AsteriskSCF::SessionCommunications::V1::RingingIndicationPtr::dynamicCast(indication)))
- {
- //
- // TODO: Who gets the ring notifications will likely depend on configuration, etc.
- //
- RingSessionOperation ringer(source, mLogger);
- mSessions->visitSessions(ringer);
- }
else if ((stopped = AsteriskSCF::SessionCommunications::V1::StoppedIndicationPtr::dynamicCast(indication)))
{
string proxyString = source->ice_toString();
@@ -114,7 +177,6 @@ public:
//
// This returns only the active sessions.
//
- SessionSeq currentSessions = mSessions->getSessionSeq();
//
// Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
@@ -122,6 +184,9 @@ public:
//
session->shutdown(mListenerPrx, ResponseCodePtr());
+ ShutdownSessionOperation shutdownOp(mListenerPrx, stopped->response, mLogger);
+ mSessions->visitSessions(shutdownOp);
+
//
// Ideally, we'd wait for an affirmative response from the session being removed from the bridge
// before doing this next part.
@@ -130,11 +195,13 @@ public:
{
try
{
+ SessionSeq currentSessions = mSessions->getSessionSeq();
+
//
// TODO: Should be determined by policy. 2 is the proper upper limit because
// the session in question will not have been removed from the bridge yet.
//
- if (currentSessions.size() <= 2)
+ if (currentSessions.size() < 2)
{
mBridgePrx->shutdown();
}
@@ -145,6 +212,15 @@ public:
}
}
}
+ else
+ {
+ IndicationPtr commandIndication = translate(indication);
+ //
+ // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+ //
+ RelayIndication relayer(source, mLogger, commandIndication, false);
+ mSessions->visitSessions(relayer);
+ }
}
void onActivate(const BridgePrx& bridgePrx, const SessionListenerPrx& listenerPrx)
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 56a5c5a..0ef489a 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -54,23 +54,25 @@ void ShutdownSessionOperation::operator()(const SessionWrapperPtr& wrapper)
wrapper->shutdown(mListener, mResponse);
}
-RingSessionOperation::RingSessionOperation(const SessionPrx& exclude, const Logger& logger) :
+RelayIndication::RelayIndication(const SessionPrx& exclude, const Logger& logger, const IndicationPtr& indication, bool includeConnected) :
mExclude(exclude->ice_getIdentity()),
- mLogger(logger)
+ mLogger(logger),
+ mIndication(indication),
+ mIncludeConnected(includeConnected)
{
}
-void RingSessionOperation::operator()(const SessionWrapperPtr& session)
+void RelayIndication::operator()(const SessionWrapperPtr& session)
{
SessionPrx s = session->getSession();
- if (s->ice_getIdentity() != mExclude)
+ if (s->ice_getIdentity() != mExclude && (mIncludeConnected || !session->isConnected()))
{
try
{
//
// TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
//
- s->indicate(new AsteriskSCF::SessionCommunications::V1::RingIndication());
+ s->indicate(mIndication);
}
catch (const Ice::ObjectNotExistException& ex)
{
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index 1976059..cdf4b82 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -53,17 +53,21 @@ private:
AsteriskSCF::System::Logging::Logger mLogger;
};
-class RingSessionOperation : public std::unary_function<SessionWrapperPtr, void>
+class RelayIndication : public std::unary_function<SessionWrapperPtr, void>
{
public:
- RingSessionOperation(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude,
- const AsteriskSCF::System::Logging::Logger& logger);
+ RelayIndication(const AsteriskSCF::SessionCommunications::V1::SessionPrx& exclude,
+ const AsteriskSCF::System::Logging::Logger& logger,
+ const AsteriskSCF::SessionCommunications::V1::IndicationPtr& indication,
+ bool includeConnected);
void operator()(const SessionWrapperPtr& session);
private:
Ice::Identity mExclude;
AsteriskSCF::System::Logging::Logger mLogger;
+ AsteriskSCF::SessionCommunications::V1::IndicationPtr mIndication;
+ bool mIncludeConnected;
};
class IfStateCriteria
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index c447486..883c614 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -18,6 +18,7 @@
#include <Ice/Ice.h>
#include <AsteriskSCF/logger.h>
#include "ServiceUtil.h"
+#include <set>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -28,6 +29,9 @@ using namespace std;
namespace
{
+
+typedef set<BridgedSessionState> StateSet;
+
//
// NOTE: This object is used in support of making connect() calls on sessions via AMI. It's a fair candidate for a
// singleton as you could pass the session as a cookie to the AMI call. However, it is desirable to have some logging,
@@ -124,7 +128,6 @@ protected:
void failed(const Ice::Exception& ex)
{
-
try
{
ex.ice_throw();
@@ -178,7 +181,6 @@ public:
protected:
bool executeImpl()
{
-
mSession->getSession()->begin_stop(mCode,
newCallback_Session_stop(this, &SessionStopTask::removed,
&SessionStopTask::failed));
@@ -241,6 +243,38 @@ private:
AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
};
+class SetStateFromTask : public QueuedTask
+{
+public:
+ SetStateFromTask(const SessionWrapperPtr& session,
+ const AsteriskSCF::Bridge::V1::BridgedSessionState newState,
+ const StateSet& startStates) :
+ QueuedTask("SetStateFromTask"),
+ mSession(session),
+ mState(newState),
+ mStartStates(startStates)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ BridgedSessionState oldState = mSession->setState(mState);
+ if (mStartStates.find(oldState) == mStartStates.end())
+ {
+ mListener->failed();
+ return false;
+ }
+ return true;
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ AsteriskSCF::Bridge::V1::BridgedSessionState mState;
+ AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
+ StateSet mStartStates;
+};
+
class ShutdownMediaTask : public QueuedTask
{
public:
@@ -250,25 +284,16 @@ public:
}
protected:
+ //
+ // NOTE: This has been changed from async to sync for the time being.. the unplugMedia
+ // call is inherently async, so we can just go on.
+ //
bool executeImpl()
{
mSession->unplugMedia();
- return false;
- }
-
- void done()
- {
- mListener->succeeded();
+ return true;
}
- void failed(const Ice::Exception&)
- {
- //
- // TODO: Log exception.
- //
- mListener->failed();
- }
-
private:
SessionWrapperPtr mSession;
};
@@ -306,26 +331,28 @@ QueuedTasks createShutdownTasks(const SessionWrapperPtr& session, const SessionL
const ResponseCodePtr& code)
{
//
- // Tasks are a queued, so they go in order that they will be processed.
+ // We use this task instead of changing states because it will prevent the
+ // queue from continuing if the session is already connected or done.
//
+ StateSet statesToContinueOn;
+ statesToContinueOn.insert(::AsteriskSCF::Bridge::V1::Added);
+ statesToContinueOn.insert(::AsteriskSCF::Bridge::V1::Connected);
+
QueuedTasks tasks;
- tasks.push_back(new SetStateTask(session, Disconnected));
+ tasks.push_back(new SetStateFromTask(session, ::AsteriskSCF::Bridge::V1::Disconnected, statesToContinueOn));
tasks.push_back(new RemoveBridgeTask(session, listener));
if (code)
{
+ tasks.push_back(new ShutdownMediaTask(session));
tasks.push_back(new SessionStopTask(session, code));
}
- //
- // TODO: These two tasks should be reversed really.
- //
- tasks.push_back(new ShutdownMediaTask(session));
return tasks;
}
QueuedTasks createSetupTasks(const SessionWrapperPtr& session)
{
QueuedTasks tasks;
- tasks.push_back(new SetStateTask(session, Connected));
+ tasks.push_back(new SetStateTask(session, ::AsteriskSCF::Bridge::V1::Connected));
tasks.push_back(new ConnectMediaTask(session));
return tasks;
}
@@ -532,19 +559,19 @@ void SessionWrapper::shutdown(const SessionListenerPrx& listener, const Response
// shutdownRunner->start();
}
-void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
+AsteriskSCF::Bridge::V1::BridgedSessionState SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
{
mLogger(Debug) << FUNLOG << ": updating state " << mId;
BridgedSessionPtr copyOfNewState;
+ AsteriskSCF::Bridge::V1::BridgedSessionState oldState;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO:
- //
+ oldState = mSession->currentState;
mSession->currentState = newState;
copyOfNewState = createUpdate();
}
pushUpdate(copyOfNewState);
+ return oldState;
}
void SessionWrapper::unplugMedia()
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 0c98112..e0a8835 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -127,7 +127,7 @@ public:
//
// TODO: Refactor so these methods don't need to be exposed.
//
- void setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
+ AsteriskSCF::Bridge::V1::BridgedSessionState setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
/**
* Disconnection helper.
diff --git a/test/TestBridging.cpp b/test/TestBridging.cpp
index a2ad429..f5569e9 100644
--- a/test/TestBridging.cpp
+++ b/test/TestBridging.cpp
@@ -420,6 +420,7 @@ public:
BOOST_CHECK(servant->createCalls() == 1);
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
channel.commands()->getlog(idA, log);
bool findStop = find(log, "stop");
if (!findStop)
@@ -615,6 +616,7 @@ public:
BOOST_CHECK(bridgeListener->addedCount() == 2);
bridge->shutdown();
+ IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(2));
BOOST_CHECK(servant->createCalls() == 1);
channel.commands()->getlog(idA, log);
bool findStop = find(log, "stop");
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list