[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "async-bridging" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Mar 14 10:00:43 CDT 2011
branch "async-bridging" has been updated
via f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8 (commit)
via ed3ad2fb7cbebc110e9f6e537b4348e07bdb91b8 (commit)
from 14ba2849cb6bfb07c77d2d32512ea242fc9da87c (commit)
Summary of changes:
src/BridgeImpl.cpp | 30 +--
src/MediaSplicer.cpp | 657 +++++++++++++++++++++++++++++++++++---------
src/MediaSplicer.h | 44 +++-
src/ServiceUtil.h | 17 ++
src/SessionListener.cpp | 70 +++--
src/SessionOperations.cpp | 49 +---
src/SessionWrapper.cpp | 355 +++++++++++++++++++++++-
src/SessionWrapper.h | 32 ++-
8 files changed, 998 insertions(+), 256 deletions(-)
- Log -----------------------------------------------------------------
commit f86a76ed0e55bd3c1ddacb80d87b8f8384d22ed8
Author: Brent Eagles <beagles at digium.com>
Date: Mon Mar 14 12:30:23 2011 -0230
Modified several functions to be AMI.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 7b45558..5861bab 100644
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -341,19 +341,7 @@ void BridgeImpl::removeSessions(const SessionSeq& sessions, const Ice::Current&
SessionWrapperPtr session = mSessions->getSession(*i);
if (session)
{
- try
- {
- //
- // TODO: AMI.
- //
- (*i)->removeBridge(mSessionListenerPrx);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Info) << ": removing the bridge from " << (*i) << " threw "
- << ex.what();
- }
- session->disconnect();
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
removedSessions.push_back(*i);
}
}
@@ -546,20 +534,8 @@ void BridgeImpl::replaceSession(const SessionPrx& oldSession, const SessionSeq&
{
throw SessionNotFound(oldSession);
}
-
- try
- {
- //
- // TODO: AMI.
- //
- oldSession->removeBridge(mSessionListenerPrx);
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Info) << ": removingthe bridge from " << oldSession << " threw "
- << ex.what();
- }
- session->disconnect();
+ session->shutdown(mSessionListenerPrx, new ResponseCode);
+
SessionSeq removed;
removed.push_back(oldSession);
mListeners->sessionsRemoved(removed);
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index 3268d98..83f9b0b 100644
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -20,13 +20,14 @@
#include "BridgeReplicatorIf.h"
#include <string>
#include <vector>
+#include "ServiceUtil.h"
+#include "SessionWrapper.h"
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Bridge::V1;
using namespace std;
-
namespace AsteriskSCF
{
namespace BridgeService
@@ -39,6 +40,25 @@ class MedixMixerPtr;
typedef pair<AsteriskSCF::Media::V1::StreamSinkPrx, AsteriskSCF::Media::V1::StreamSourcePrx> OutgoingPairing;
typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::StreamSinkPrx> IncomingPairing;
+typedef vector<OutgoingPairing> OutgoingPairings;
+typedef vector<IncomingPairing> IncomingPairings;
+
+class MediaConnectorI;
+typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
+class MediaSplicerI;
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
+
+struct MediaConnectorBuilder : public IceUtil::Shared
+{
+public:
+ AsteriskSCF::Media::V1::SessionPrx mediaSession;
+ AsteriskSCF::Media::V1::StreamSourceSeq sources;
+ AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+ MediaConnectorIPtr peer;
+ OutgoingPairings outgoingPairings;
+ IncomingPairings incomingPairings;
+};
+typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
//
// TODO: These proxies could use some retry properties added to them...
@@ -48,17 +68,15 @@ typedef pair<AsteriskSCF::Media::V1::StreamSourcePrx, AsteriskSCF::Media::V1::St
class MediaConnectorI : public MediaConnector
{
public:
- MediaConnectorI(const AsteriskSCF::Media::V1::SessionPrx& media,
- const vector<OutgoingPairing>& outgoing, const vector<IncomingPairing>& incoming,
- const MediaConnectorPtr& peer,
- const string& bridgeId,
- const string& sessionId,
+ MediaConnectorI(const MediaConnectorBuilderPtr& data,
+ const std::string& bridgeId,
+ const std::string& sessionId,
const ReplicatorSmartPrx& replicator,
- const Logger& logger):
- mMedia(media),
- mOutgoing(outgoing),
- mIncoming(incoming),
- mPeer(peer),
+ const Logger& logger) :
+ mMedia(data->mediaSession),
+ mOutgoing(data->outgoingPairings),
+ mIncoming(data->incomingPairings),
+ mPeer(data->peer),
mConnected(true),
mBridgeId(bridgeId),
mSessionId(sessionId),
@@ -66,19 +84,9 @@ public:
mReplicator(replicator),
mLogger(logger)
{
- mLogger(Debug) << FUNLOG << ": setting up a media connector with " << outgoing.size()
- << " outgoing pairings and " << incoming.size() << " incoming pairings.";
+ mLogger(Debug) << FUNLOG << ": setting up a media connector with " << mOutgoing.size()
+ << " outgoing pairings and " << mIncoming.size() << " incoming pairings.";
mKey = mBridgeId + mSessionId + "media";
- for (vector<OutgoingPairing>::iterator i = mOutgoing.begin(); i != mOutgoing.end(); ++i)
- {
- i->second->setSink(i->first);
- i->first->setSource(i->second);
- }
- for (vector<IncomingPairing>::iterator i = mIncoming.begin(); i != mIncoming.end(); ++i)
- {
- i->first->setSink(i->second);
- i->second->setSource(i->first);
- }
}
MediaConnectorI(const SessionPairingPtr& pairing,
@@ -167,7 +175,8 @@ public:
mIncoming.clear();
}
- void update(const MediaConnectorPtr& peer, const vector<OutgoingPairing>& outgoing, const std::vector<IncomingPairing>& incoming)
+ void update(const MediaConnectorPtr& peer, const OutgoingPairings& outgoing,
+ const IncomingPairings& incoming)
{
mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
SessionPairingPtr update;
@@ -226,6 +235,78 @@ public:
pushUpdate(createUpdate());
}
+ //
+ // Like disconnect or unplug, but uses all oneway operations.
+ //
+ void destroy()
+ {
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
+ SessionPairingPtr update;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
+ mOutgoing.clear();
+ incoming = mIncoming;
+ mIncoming.clear();
+ mConnected = false;
+ update = createUpdate();
+ }
+ pushUpdate(update);
+ if (outgoing.size() == 0 && incoming.size() == 0)
+ {
+ return;
+ }
+ mLogger(Debug) << FUNLOG << ": unplugging sinks and sources";
+
+ //
+ // Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot
+ // for oneways. We don't care about errors when destroying. We still catch Ice::Exceptions though since
+ // a connection might be attempted, even on behalf of a oneway.
+ //
+ for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
+ {
+ mLogger(Debug) << FUNLOG << ": disconnecting " << i->second->ice_toString() << " and " << i->first->ice_toString();
+
+ try
+ {
+ tryOneWay(i->second)->setSink(0);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
+ try
+ {
+ tryOneWay(i->first)->setSource(0);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
+ }
+ for (vector<IncomingPairing>::iterator i = incoming.begin(); i != incoming.end(); ++i)
+ {
+ mLogger(Debug) << FUNLOG << ": disconnecting " << i->first->ice_toString() << " and " << i->second->ice_toString();
+ try
+ {
+ tryOneWay(i->first)->setSink(0);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
+ try
+ {
+ tryOneWay(i->second)->setSource(0);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
+ }
+ }
+
void update(const SessionPairingPtr& update)
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
@@ -266,7 +347,7 @@ public:
//
// Disconnect everybody, eating up exceptions in case things have gone away. This is a perfect spot
- // for oneways or, at the very least, AMI.
+ // for oneways or, at the very least, AMI. XXX
//
for (vector<OutgoingPairing>::iterator i = outgoing.begin(); i != outgoing.end(); ++i)
{
@@ -332,16 +413,13 @@ private:
Logger mLogger;
};
-typedef IceUtil::Handle<MediaConnectorI> MediaConnectorIPtr;
-
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer);
//
// TODO: This needs to register the streams with an active threaded mixing element.
//
class MediaSplicerI : public IceUtil::Shared
{
-
public:
-
MediaSplicerI(const Ice::CommunicatorPtr& communicator, const string& bridgeId, const ReplicatorSmartPrx& replicator, const Logger& logger) :
mCommunicator(communicator),
mBridgeId(bridgeId),
@@ -350,20 +428,23 @@ public:
{
}
- MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
+ MediaConnectorPtr createConnector(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& data)
{
- AsteriskSCF::Media::V1::SessionPrx media = session->getMediaSession();
- if (!media)
+ MediaConnectorIPtr connector(new MediaConnectorI(data, mBridgeId, session->id(), mReplicator, mLogger));
+ if (data->peer)
{
- mLogger(Debug) << FUNLOG << ": no media available!";
- return 0;
+ data->peer->update(connector, data->outgoingPairings, data->incomingPairings);
}
+ connector->initialUpdate();
+ mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+ return connector;
+ }
+ void connect(const SessionWrapperPtr& session)
+ {
boost::unique_lock<boost::shared_mutex> lock(mLock);
- MediaConnectorIPtr result;
-
//
- // Loop through looking for an identical media session, also reaping disconnected connectors as we go.
+ // Reap.
//
MediaSessions::iterator i = mSessions.begin();
while (i != mSessions.end())
@@ -375,58 +456,21 @@ public:
i = mSessions.erase(t);
continue;
}
- if (i->mediaSession == media)
- {
- result = MediaConnectorIPtr::dynamicCast(i->connector);
- assert(result);
- }
++i;
}
- if (result)
- {
- mLogger(Debug) << FUNLOG << ": found an existing matching connector, returning that!";
- return result;
- }
//
// PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
// one of them is going to have an empty connector.
//
-
- MediaConnectorPtr existing;
+ MediaConnectorIPtr existing;
if (mSessions.size() == 1)
{
existing = mSessions.back().connector;
existing->clearConnections();
}
-
- //
- // Idiom-wise, it is best to defer calling getSources() until after the sinks have all been
- // processed. However, we do not want to do the sink linkup if the getSources fails so we
- // do both calls up here.
- //
- StreamSinkSeq sinks = media->getSinks();
- StreamSourceSeq sources = media->getSources();
-
- vector<OutgoingPairing> outgoingPairings(findCompatiblePairings(sinks));
- vector<IncomingPairing> incomingPairings(findCompatiblePairings(sources));
-
- string sessionId = mCommunicator->identityToString(session->ice_getIdentity());
- result = new MediaConnectorI(media, outgoingPairings, incomingPairings, existing, mBridgeId, sessionId, mReplicator, mLogger);
- mLogger(Debug) << FUNLOG << ": established connections, returning connector object";
- mSessions.push_back(MediaSessionStruct(media, result));
- result->initialUpdate();
-
- //
- // PRE-ALPHA-ONLY code. When it's a two party bridge and the sessions are added or removed separately,
- // one of them is going to have an empty connector. Now that we are adding the new connector
- //
- if (existing)
- {
- MediaConnectorIPtr p = MediaConnectorIPtr::dynamicCast(existing);
- p->update(result, outgoingPairings, incomingPairings);
- }
- return result;
+ ExecutorPtr taskExecutor(new Executor(createMediaConnectTasks(session, existing, this)));
+ taskExecutor->start();
}
MediaConnectorPtr createReplica(const SessionPairingPtr& pairings)
@@ -458,6 +502,89 @@ public:
return result;
}
+ vector<OutgoingPairing> findCompatiblePairings(const StreamSinkSeq& sinks)
+ {
+ vector<StreamSourceSeq> allSources;
+
+ //
+ // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
+ // will come into a mix and go out.
+ //
+ for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ allSources.push_back(i->mediaSession->getSources());
+ }
+
+ vector<OutgoingPairing> result;
+ for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ {
+ bool connected = false;
+ for (vector<StreamSourceSeq>::iterator j = allSources.begin(); j != allSources.end() && !connected; ++j)
+ {
+ for (StreamSourceSeq::iterator k = j->begin(); k != j->end(); ++k)
+ {
+ if (canAdapt(*i, *k))
+ {
+ result.push_back(OutgoingPairing(*i, *k));
+ connected = true;
+ StreamSourceSeq::iterator t = k;
+ j->erase(t);
+ break;
+ }
+ }
+ }
+ if (!connected)
+ {
+ //
+ // TODO: We couldn't establish a proper connection for this sink. What happens
+ // here is probably a policy thing.
+ //
+ }
+ }
+ return result;
+ }
+
+ vector<IncomingPairing> findCompatiblePairings(const StreamSourceSeq& sources)
+ {
+ vector<StreamSinkSeq> allSinks;
+ //
+ // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
+ // will come into a mix and go out.
+ //
+ for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ allSinks.push_back(i->mediaSession->getSinks());
+ }
+
+ vector<IncomingPairing> result;
+ for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ {
+ bool connected = false;
+ for (vector<StreamSinkSeq>::iterator j = allSinks.begin(); j != allSinks.end() && !connected; ++j)
+ {
+ for (StreamSinkSeq::iterator k = j->begin(); k != j->end(); ++k)
+ {
+ if (canAdapt(*i, *k))
+ {
+ result.push_back(IncomingPairing(*i, *k));
+ StreamSinkSeq::iterator t = k;
+ j->erase(t);
+ connected = true;
+ break;
+ }
+ }
+ }
+ if (!connected)
+ {
+ //
+ // TODO: We couldn't establish a proper connection for this source! What happens now
+ // should be a matter of policy.
+ //
+ }
+ }
+ return result;
+ }
+
private:
boost::shared_mutex mLock;
@@ -469,9 +596,9 @@ private:
struct MediaSessionStruct
{
SessionPrx mediaSession;
- MediaConnectorPtr connector;
+ MediaConnectorIPtr connector;
- MediaSessionStruct(const SessionPrx& session, const MediaConnectorPtr& con) :
+ MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
mediaSession(session),
connector(con)
{
@@ -532,82 +659,346 @@ private:
//
return true;
}
+};
- vector<OutgoingPairing> findCompatiblePairings(const StreamSinkSeq& sinks)
+typedef IceUtil::Handle<MediaSplicerI> MediaSplicerIPtr;
+
+class GetMediaSession : public QueuedTask
+{
+public:
+ GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+ mSession(session),
+ mMaterials(materials)
{
- vector<StreamSourceSeq> allSources;
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mSession->getSession()->begin_getMediaSession(
+ AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
+ &GetMediaSession::done, &GetMediaSession::failed));
+ return false;
+ }
+
+ void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+ {
+ mMaterials->mediaSession = mediaSession;
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception& ex)
+ {
+ mListener->failed();
+ }
+private:
+ SessionWrapperPtr mSession;
+ MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetSinks : public QueuedTask
+{
+public:
+ GetSinks(const MediaConnectorBuilderPtr& materials) :
+ mMaterials(materials)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
+ &GetSinks::done, &GetSinks::failed));
+ return false;
+ }
+
+ void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+ {
+ mMaterials->sinks = sinks;
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+private:
+ MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetSources : public QueuedTask
+{
+public:
+ GetSources(const MediaConnectorBuilderPtr& materials) :
+ mMaterials(materials)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
+ &GetSources::done, &GetSources::failed));
+ return false;
+ }
+
+ void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+ {
+ mMaterials->sources = sources;
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+
+private:
+ MediaConnectorBuilderPtr mMaterials;
+};
+
+class GetCompatiblePairings : public QueuedTask
+{
+public:
+ GetCompatiblePairings(const MediaSplicerIPtr& splicer, const MediaConnectorBuilderPtr& materials) :
+ mSplicer(splicer),
+ mMaterials(materials)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->sources);
+ mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->sinks);
+ return true;
+ }
+
+private:
+ MediaSplicerIPtr mSplicer;
+ MediaConnectorBuilderPtr mMaterials;
+};
+
+//
+// Handles setting up the connections using AMI. A little tricky to do in a minimal way, so enjoy. NOTE: It might
+// appear that this class is doing a little more bookkeeping than would be "minimal", but I've added a fair bit of
+// tracking with the hopes of implementing some rudimentary rollback in the future.
+//
+class MakeConnections : public QueuedTask
+{
+public:
+ MakeConnections(const MediaConnectorBuilderPtr& materials) :
+ mMaterials(materials),
+ mIncomingPairings(mMaterials->incomingPairings.size(), 0),
+ mOutgoingPairings(mMaterials->outgoingPairings.size(), 0),
+ mTotalCount(mIncomingPairings.size() + mOutgoingPairings.size())
+ {
+ }
+
+protected:
+
+ //
+ // Incoming and outgoing pairings follow the same logic, but use different types. The best way to code that kind of
+ // thing is with templates!
+ //
+ template <class T>
+ class PairingHelper : public IceUtil::Shared
+ {
+ public:
+ PairingHelper(const IceUtil::Handle<MakeConnections>& task, size_t position) :
+ mTask(task),
+ mPosition(position),
+ mSourceFlag(false),
+ mSinkFlag(false)
{
- allSources.push_back(i->mediaSession->getSources());
}
- vector<OutgoingPairing> result;
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ void setSinkCB()
{
- bool connected = false;
- for (vector<StreamSourceSeq>::iterator j = allSources.begin(); j != allSources.end() && !connected; ++j)
+ bool other = false;
{
- for (StreamSourceSeq::iterator k = j->begin(); k != j->end(); ++k)
- {
- if (canAdapt(*i, *k))
- {
- result.push_back(OutgoingPairing(*i, *k));
- connected = true;
- StreamSourceSeq::iterator t = k;
- j->erase(t);
- break;
- }
- }
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSinkFlag = true;
+ other = mSourceFlag;
}
- if (!connected)
+ if (other)
{
- //
- // TODO: We couldn't establish a proper connection for this sink. What happens
- // here is probably a policy thing.
- //
+ done();
}
}
- return result;
- }
- vector<IncomingPairing> findCompatiblePairings(const StreamSourceSeq& sources)
+ void setSourceCB()
+ {
+ bool other = false;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSourceFlag = true;
+ other = mSinkFlag;
+ }
+ if (other)
+ {
+ done();
+ }
+ }
+
+ void failed(const Ice::Exception& ex)
+ {
+ mTask->failed(ex, mPosition);
+ }
+
+ void done();
+
+ private:
+
+ IceUtil::Handle<MakeConnections> mTask;
+ boost::shared_mutex mLock;
+ size_t mPosition;
+ bool mSourceFlag;
+ bool mSinkFlag;
+ };
+
+ typedef PairingHelper<OutgoingPairing> OutgoingHelper;
+ typedef PairingHelper<IncomingPairing> IncomingHelper;
+ typedef IceUtil::Handle<OutgoingHelper> OutgoingHelperPtr;
+ typedef IceUtil::Handle<IncomingHelper> IncomingHelperPtr;
+
+ bool executeImpl()
{
- vector<StreamSinkSeq> allSinks;
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ //
+ // If there are lots of pairings, this could result in a "flurry" of AMI requests. I'm not sure that it is
+ // anything to worry about though. Ice will queue the requests and send them according to the "send" threads
+ // available.
+ //
+ size_t index = 0;
+ for (OutgoingPairings::iterator i = mMaterials->outgoingPairings.begin(); i != mMaterials->outgoingPairings.end(); ++i)
{
- allSinks.push_back(i->mediaSession->getSinks());
+ OutgoingHelperPtr h(new OutgoingHelper(this, index));
+ i->second->begin_setSink(i->first, AsteriskSCF::Media::V1::newCallback_StreamSource_setSink(h, &OutgoingHelper::setSinkCB, &OutgoingHelper::failed));
+ i->first->begin_setSource(i->second, AsteriskSCF::Media::V1::newCallback_StreamSink_setSource(h, &OutgoingHelper::setSourceCB, &OutgoingHelper::failed));
+ ++index;
}
+ index = 0;
+ for (IncomingPairings::iterator i = mMaterials->incomingPairings.begin(); i != mMaterials->incomingPairings.end(); ++i)
+ {
+ IncomingHelperPtr h(new IncomingHelper(this, index));
+ i->first->begin_setSink(i->second, AsteriskSCF::Media::V1::newCallback_StreamSource_setSink(h, &IncomingHelper::setSinkCB, &IncomingHelper::failed));
+ i->second->begin_setSource(i->first, AsteriskSCF::Media::V1::newCallback_StreamSink_setSource(h, &IncomingHelper::setSourceCB, &IncomingHelper::failed));
+ ++index;
+ }
+ return false;
+ }
- vector<IncomingPairing> result;
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ void incoming(size_t position)
+ {
+ bool done = false;
{
- bool connected = false;
- for (vector<StreamSinkSeq>::iterator j = allSinks.begin(); j != allSinks.end() && !connected; ++j)
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mIncomingPairings.at(position) == 0)
{
- for (StreamSinkSeq::iterator k = j->begin(); k != j->end(); ++k)
- {
- if (canAdapt(*i, *k))
- {
- result.push_back(IncomingPairing(*i, *k));
- StreamSinkSeq::iterator t = k;
- j->erase(t);
- connected = true;
- break;
- }
- }
+ mIncomingPairings.at(position) = 1;
+ ++mCurrentCount;
+ done = (mCurrentCount == mTotalCount);
}
- if (!connected)
+ }
+ if (done)
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void outgoing(size_t position)
+ {
+ bool done = false;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ if (mOutgoingPairings.at(position) == 0)
{
- //
- // TODO: We couldn't establish a proper connection for this source! What happens now
- // should be a matter of policy.
- //
+ mOutgoingPairings.at(position) = 1;
+ ++mCurrentCount;
+ done = (mCurrentCount == mTotalCount);
}
}
- return result;
+ if (done)
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed(const Ice::Exception& ex, size_t position)
+ {
+ //
+ // TODO: Do something special here probably. Its not clear that the whole operation should failed if a single
+ // pairing out of the whole group failed. At the very least, there should be a rollback.
+ //
+ mListener->failed();
+ }
+
+private:
+ boost::shared_mutex mLock;
+ MediaConnectorBuilderPtr mMaterials;
+
+ //
+ // Keeping track of the responses from incoming and outgoing pairings allow the total count to be used as a finished
+ // flag even in the event of duplicate returns. And as already indicated the mTotalCount lets us know when its time
+ // to say we are done.
+ //
+ Ice::IntSeq mIncomingPairings;
+ Ice::IntSeq mOutgoingPairings;
+ size_t mCurrentCount;
+ const size_t mTotalCount;
+};
+
+//
+// Specializations so the connections helper can keep track of the different sets of pairings/.
+//
+template <> void MakeConnections::PairingHelper<OutgoingPairing>::done()
+{
+ mTask->outgoing(mPosition);
+}
+
+template <> void MakeConnections::PairingHelper<IncomingPairing>::done()
+{
+ mTask->incoming(mPosition);
+}
+
+class CreateAndRegisterConnector : public QueuedTask
+{
+public:
+ CreateAndRegisterConnector(const SessionWrapperPtr& session, const MediaSplicerIPtr& splicer,
+ const MediaConnectorBuilderPtr& materials) :
+ mSession(session),
+ mSplicer(splicer),
+ mMaterials(materials)
+ {
}
+
+ bool executeImpl()
+ {
+ MediaConnectorPtr connector(mSplicer->createConnector(mSession, mMaterials));
+ mSession->setConnector(connector);
+ return true;
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ MediaSplicerIPtr mSplicer;
+ MediaConnectorBuilderPtr mMaterials;
};
+QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session, const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer)
+{
+ QueuedTasks tasks;
+ MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
+ materials->peer = peer;
+ tasks.push_front(new GetMediaSession(session, materials));
+ tasks.push_front(new GetSinks(materials));
+ tasks.push_front(new GetSources(materials));
+ tasks.push_front(new GetCompatiblePairings(splicer, materials));
+ tasks.push_front(new MakeConnections(materials));
+ tasks.push_front(new CreateAndRegisterConnector(session, splicer, materials));
+ return tasks;
+}
+
} // End of namespace BridgeService
} // End of namespace AsteriskSCF
@@ -619,11 +1010,7 @@ MediaSplicer::MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string&
{
}
-MediaSplicer::~MediaSplicer()
-{
-}
-
-MediaConnectorPtr MediaSplicer::connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session)
+void MediaSplicer::connect(const SessionWrapperPtr& session)
{
return mImpl->connect(session);
}
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index 08443d8..65108f1 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -27,36 +27,66 @@ namespace AsteriskSCF
{
namespace BridgeService
{
+
+//
+// Forward declarations.
+//
+class SessionWrapper;
+typedef IceUtil::Handle<SessionWrapper> SessionWrapperPtr;
+
+/**
+ *
+ * Interface for MediaConnector objects.
+ *
+ **/
class MediaConnector : public IceUtil::Shared
{
public:
virtual ~MediaConnector() {}
+
+ /**
+ * Disconnects the media pairings associated with this connector. It will involve the
+ * peer media connector, if one exists.
+ **/
virtual void unplug() = 0;
+
virtual bool isConnected() = 0;
+
+ /**
+ * Simply "forget" the pairings. Used when this connector's complement has been disconnect already.
+ **/
virtual void clearConnections() = 0;
+
+ /**
+ * Nicely disconnect the media pairings owned by this connector.
+ **/
virtual bool disconnectMedia() = 0;
+
+ /**
+ * Best effort, fire-forget disconnect the media pairings.
+ **/
+ virtual void destroy() = 0;
+ /**
+ * Replication support function.
+ **/
virtual void update(const AsteriskSCF::Bridge::V1::SessionPairingPtr& update) = 0;
};
-
typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
-
+
class MediaSplicerI;
-
class MediaSplicer : public IceUtil::Shared
{
public:
MediaSplicer(const Ice::CommunicatorPtr& comm, const std::string& bridgeId, const ReplicatorSmartPrx& replicator,
const AsteriskSCF::System::Logging::Logger& logger);
- ~MediaSplicer();
- MediaConnectorPtr connect(const AsteriskSCF::SessionCommunications::V1::SessionPrx& session);
-
+
+ void connect(const SessionWrapperPtr& session);
MediaConnectorPtr createReplica(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
private:
boost::shared_ptr<MediaSplicerI> mImpl;
};
-
typedef IceUtil::Handle<MediaSplicer> MediaSplicerPtr;
} // End of namespace BridgeService
diff --git a/src/SessionListener.cpp b/src/SessionListener.cpp
index 280c49e..c5a61b3 100644
--- a/src/SessionListener.cpp
+++ b/src/SessionListener.cpp
@@ -57,8 +57,12 @@ public:
return;
}
+#if 0
session->setConnected();
session->setupMedia();
+#endif
+ session->setup();
+
ConnectSessionOperation connector(source, mLogger);
mSessions->visitSessions(connector);
}
@@ -71,19 +75,31 @@ public:
void flashed(const SessionPrx& source, const Ice::Current&)
{
+ //
+ // TODO: Is there something to be done here?
+ //
}
void held(const SessionPrx& source, const Ice::Current&)
{
+ //
+ // TODO: Should this affect the media operations.
+ //
}
void progressing(const SessionPrx& source,
const ResponseCodePtr& response, const Ice::Current&)
{
+ //
+ // TODO: Is there something to be done here?
+ //
}
void ringing(const SessionPrx& source, const Ice::Current&)
{
+ //
+ // TODO: Who gets the ring notifications will likely depend on configuration, etc.
+ //
RingSessionOperation ringer(source, mLogger);
mSessions->visitSessions(ringer);
}
@@ -93,34 +109,38 @@ public:
string proxyString = source->ice_toString();
mLogger(Debug) << FUNLOG << ": session stopped " << proxyString;
+ //
+ // IMPLNOTE: The AMI approach.
+ // Stack some operations in a callback. In this case it will be:
+ // 1. getSession()
+ // 2. removeBridge()
+ // 3. disconnect() or destroy()
+ //
+ // The latter two should be coupled as a transaction. One way to go about this would be to replicate the
+ // operations that need to be done together and checkpoint after each one completes. Actually, there are lots of
+ // ways to go about it. Need to think on this more.
+ //
+ // Q. What is the proper order of operations here, should removeBridge be called before or after we
+ // "disconnect" the session?
+ //
SessionWrapperPtr session;
- try
- {
- session = mSessions->getSession(source);
- if (!session)
- {
- mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
- << proxyString << " that does not match known sessions. Possible out of order operations.";
- return;
- }
-
- //
- // We don't actually have the proxy for this object, but we can create one on the fly.
- //
- SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
- source->removeBridge(listenerPrx);
- session->disconnect();
- }
- catch (const Ice::ObjectNotExistException& ex)
+ session = mSessions->getSession(source);
+ if (!session)
{
- session->destroy();
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- throw;
+ mLogger(Info) << "Attempt to respond to connected notification for session with proxy "
+ << proxyString << " that does not match known sessions. Possible out of order operations.";
+ return;
}
+
+ //
+ // We don't actually have the proxy for this object, but we can create one on the fly.
+ //
+ SessionListenerPrx listenerPrx = SessionListenerPrx::uncheckedCast(current.adapter->createProxy(current.id));
+
+ //
+ // Shutdown is handled asynchronously, so there won't be any exceptions that need to be caught here.
+ //
+ session->shutdown(listenerPrx, new ResponseCode);
}
void unheld(const SessionPrx& source, const Ice::Current&)
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 279bc96..4ae2ab9 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -33,26 +33,10 @@ void ConnectSessionOperation::operator()(const SessionWrapperPtr& s)
{
if (s->getSession()->ice_getIdentity() != mExclude)
{
- try
- {
- //
- // TODO: AMI!
- //
- s->connect();
- }
- catch (const Ice::ObjectNotExistException& ex)
- {
- //
- // If the object has indeed been destroyed, we tell the wrapper that it should
- // go away and it takes care of making sure that the right things happen.
- //
- s->destroy();
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
+ //
+ // This is ultimately implemented as an AMI request, so this will return pretty much immediately.
+ //
+ s->connect();
}
}
@@ -67,28 +51,7 @@ ShutdownSessionOperation::ShutdownSessionOperation(const SessionListenerPrx& lis
void ShutdownSessionOperation::operator()(const SessionWrapperPtr& wrapper)
{
- SessionPrx s = wrapper->getSession();
- try
- {
- //
- // TODO: Make AMI.
- //
- s->removeBridge(mListener);
- wrapper->disconnect();
- s->stop(mResponse);
- }
- catch (const Ice::ObjectNotExistException& ex)
- {
- //
- // Just make sure we are being cleaned up properly.
- //
- wrapper->destroy();
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ": " << ex.what();
- }
+ wrapper->shutdown(mListener, mResponse);
}
RingSessionOperation::RingSessionOperation(const SessionPrx& exclude, const Logger& logger) :
@@ -105,7 +68,7 @@ void RingSessionOperation::operator()(const SessionWrapperPtr& session)
try
{
//
- // TODO: AMI!
+ // TODO: AMI.. or would this be better as a oneway. Do we care if we get a response etc?
//
s->ring();
}
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 5c8b13f..7c618c9 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -19,8 +19,6 @@
#include <AsteriskSCF/logger.h>
#include "ServiceUtil.h"
-#include <boost/thread/once.hpp>
-
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
using namespace AsteriskSCF::BridgeService;
@@ -74,6 +72,7 @@ public:
// XXX- it might be reasonable to disconnect this session for this!
//
mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ mSession->destroy();
}
catch (const Ice::SocketException& ex)
{
@@ -89,12 +88,214 @@ public:
//
mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
}
- }
+ }
private:
SessionWrapperPtr mSession;
Logger mLogger;
};
+
+//
+// Micro tasks
+//
+class RemoveBridgeTask : public QueuedTask
+{
+public:
+ RemoveBridgeTask(const SessionWrapperPtr& session, const SessionListenerPrx& listener) :
+ mSession(session),
+ mSessionListener(listener)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mSession->getSession()->begin_removeBridge(mSessionListener,
+ newCallback_Session_removeBridge(this, &RemoveBridgeTask::removed,
+ &RemoveBridgeTask::failed));
+ return false;
+ }
+
+ void removed()
+ {
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception& ex)
+ {
+ mListener->failed();
+ }
+
+ void failImpl()
+ {
+ //
+ // NO-OP
+ //
+ }
+
+ void destroyImpl()
+ {
+ //
+ // NO-OP
+ //
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ SessionListenerPrx mSessionListener;
+};
+
+class SessionStopTask : public QueuedTask
+{
+public:
+ SessionStopTask(const SessionWrapperPtr& session,
+ const ResponseCodePtr& code) :
+ mSession(session),
+ mCode(code)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+
+ mSession->getSession()->begin_stop(mCode,
+ newCallback_Session_stop(this, &SessionStopTask::removed,
+ &SessionStopTask::failed));
+ return false;
+ }
+
+ void removed()
+ {
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+
+ void failImpl()
+ {
+ //
+ // NO-OP
+ //
+ }
+
+ void destroyImpl()
+ {
+ //
+ // NO-OP
+ //
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ ResponseCodePtr mCode;
+};
+
+class SetStateTask : public QueuedTask
+{
+public:
+ SetStateTask(const SessionWrapperPtr& session,
+ const AsteriskSCF::Bridge::V1::BridgedSessionState newState) :
+ mSession(session),
+ mState(newState)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mSession->setState(mState);
+ return true;
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ AsteriskSCF::Bridge::V1::BridgedSessionState mState;
+ AsteriskSCF::Bridge::V1::BridgedSessionState mOldState;
+};
+
+class ShutdownMediaTask : public QueuedTask
+{
+public:
+ ShutdownMediaTask(const SessionWrapperPtr& session) :
+ mSession(session)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mSession->unplugMedia();
+ return false;
+ }
+
+ void done()
+ {
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+
+private:
+ SessionWrapperPtr mSession;
+};
+
+class ConnectMediaTask : public QueuedTask
+{
+public:
+ ConnectMediaTask(const SessionWrapperPtr& session) :
+ mSession(session)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mSession->setupMedia();
+ return true;
+ }
+
+ void done()
+ {
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+private:
+ SessionWrapperPtr mSession;
+};
+
+QueuedTasks createShutdownTasks(const SessionWrapperPtr& session, const SessionListenerPrx& listener,
+ const ResponseCodePtr& code)
+{
+ //
+ // Tasks are a queued, so they go in order that they will be processed.
+ //
+ QueuedTasks tasks;
+ tasks.push_front(new SetStateTask(session, BridgedSessionState::Disconnected));
+ tasks.push_front(new RemoveBridgeTask(session, listener));
+ tasks.push_front(new ShutdownMediaTask(session));
+ tasks.push_front(new SessionStopTask(session, code));
+ return tasks;
+}
+
+QueuedTasks createSetupTasks(const SessionWrapperPtr& session)
+{
+ QueuedTasks tasks;
+ tasks.push_front(new SetStateTask(session, BridgedSessionState::Connected));
+ tasks.push_front(new ConnectMediaTask(session));
+ return tasks;
+}
+
}
SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
@@ -104,7 +305,8 @@ SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
mReplicator(replicator),
mLogger(logger),
mId(mSession->key),
- mSplicer(splicer)
+ mSplicer(splicer),
+ mActivities(new Executor)
{
}
@@ -128,7 +330,7 @@ void SessionWrapper::connect()
mSession->session->begin_connect(
newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
&ConnectRequestCallback::connectCB,
- &ConnectRequestCallback::failureCB), this);
+ &ConnectRequestCallback::failureCB));
}
}
@@ -136,7 +338,7 @@ void SessionWrapper::ring()
{
{
boost::shared_lock<boost::shared_mutex> lock(mLock);
- if (mSession->currentState == BridgedSessionState::Connected)
+ if (mSession->currentState != BridgedSessionState::Added)
{
return;
}
@@ -180,10 +382,7 @@ void SessionWrapper::setupMedia()
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState == BridgedSessionState::Added)
{
- if (!mConnector)
- {
- mConnector = mSplicer->connect(mSession->session);
- }
+ mSplicer->connect(this);
mSession->currentState = BridgedSessionState::Connected;
update = createUpdate();
}
@@ -191,6 +390,15 @@ void SessionWrapper::setupMedia()
pushUpdate(update);
}
+void SessionWrapper::setConnector(const MediaConnectorPtr& connector)
+{
+ mLogger(Debug) << FUNLOG << " for " << mId;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mConnector = connector;
+ }
+}
+
void SessionWrapper::updateMedia(const SessionPairingPtr& pairings)
{
mLogger(Debug) << FUNLOG << " for " << mId;
@@ -245,6 +453,10 @@ void SessionWrapper::destroy()
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mSession->currentState != BridgedSessionState::Done)
{
+ //
+ // TODO: This should be a best effort attempt to unplug the media. This might mean some
+ // oneways along with some exception wherever the disconnections are actually taking place.
+ //
unplugMedia();
mSession->currentState = BridgedSessionState::Done;
update = createUpdate();
@@ -264,14 +476,39 @@ string SessionWrapper::id()
return mId;
}
-void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+void SessionWrapper::setup()
{
- if (update && mReplicator)
+ mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
+ mActivities->append(new QueueableExecutor(createSetupTasks(this)));
+}
+
+void SessionWrapper::shutdown(const SessionListenerPrx& listener, const ResponseCodePtr& code)
+{
+ mLogger(Debug) << FUNLOG << ": beginning shutdown of " << mId;
+ QueuedTaskPtr shutdownRunner(new QueueableExecutor(createShutdownTasks(this, listener, code)));
+ //
+ // TODO: determine if the pending activites should be shutdown first.
+ //
+ mActivities->append(shutdownRunner);
+ //
+ // If shutdown is going to preempt stuff we need to uncomment this.
+ //
+ // shutdownRunner->start();
+}
+
+void SessionWrapper::setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState)
+{
+ mLogger(Debug) << FUNLOG << ": updating state " << mId;
+ BridgedSessionPtr update;
{
- ReplicatedStateItemSeq seq;
- seq.push_back(update);
- mReplicator->setState(seq);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ //
+ // TODO:
+ //
+ mSession->currentState = newState;
+ update = createUpdate();
}
+ pushUpdate(update);
}
void SessionWrapper::unplugMedia()
@@ -289,6 +526,16 @@ void SessionWrapper::unplugMedia()
}
}
+void SessionWrapper::pushUpdate(const BridgedSessionPtr& update)
+{
+ if (update && mReplicator)
+ {
+ ReplicatedStateItemSeq seq;
+ seq.push_back(update);
+ mReplicator->setState(seq);
+ }
+}
+
BridgedSessionPtr SessionWrapper::createUpdate()
{
//
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index c63fdb4..52ad6cd 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -20,6 +20,7 @@
#include <AsteriskSCF/logger.h>
#include <boost/thread/locks.hpp>
#include "MediaSplicer.h"
+#include "Tasks.h"
namespace AsteriskSCF
{
@@ -85,6 +86,7 @@ public:
* Initiates replication.
**/
void setupMedia();
+ void setConnector(const MediaConnectorPtr& connector);
void updateMedia(const AsteriskSCF::Bridge::V1::SessionPairingPtr& pairings);
/**
@@ -111,6 +113,27 @@ public:
bool isDestroyed();
std::string id();
+
+ //
+ // Large scale macro operations. In effect the wrapper becomes a sub-component within the bridge.
+ //
+ // TODO: Should replace the more granular operations in the interface.
+ //
+ void setup();
+
+ void shutdown(const AsteriskSCF::SessionCommunications::V1::SessionListenerPrx& listener,
+ const AsteriskSCF::SessionCommunications::V1::ResponseCodePtr& code);
+
+
+ //
+ // TODO: Refactor so these methods don't need to be exposed.
+ //
+ void setState(const AsteriskSCF::Bridge::V1::BridgedSessionState newState);
+
+ /**
+ * Disconnection helper.
+ **/
+ void unplugMedia();
private:
@@ -121,17 +144,14 @@ private:
AsteriskSCF::System::Logging::Logger mLogger;
std::string mId;
MediaSplicerPtr mSplicer;
-
+ ExecutorPtr mActivities;
+
/**
* Sends changes to the replication service. This should never occur
* unless the host service is active.
**/
void pushUpdate(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& update);
- /**
- * Disconnection helper.
- **/
- void unplugMedia();
AsteriskSCF::Bridge::V1::BridgedSessionPtr createUpdate();
};
commit ed3ad2fb7cbebc110e9f6e537b4348e07bdb91b8
Author: Brent Eagles <beagles at digium.com>
Date: Thu Mar 10 11:12:08 2011 -0330
Make Sesson:connected() calls AMI.
diff --git a/src/ServiceUtil.h b/src/ServiceUtil.h
index 36610b9..f2b4917 100644
--- a/src/ServiceUtil.h
+++ b/src/ServiceUtil.h
@@ -170,4 +170,21 @@ private:
size_t mCounter;
};
+//
+// Helper template to try and create a one way proxy. If the endpoint configuration on the proxy will not allow
+// oneways, the original proxy is returned.
+//
+template <typename PrxType>
+PrxType tryOneWay(const PrxType& orig)
+{
+ try
+ {
+ return PrxType::uncheckedCast(orig->ice_oneway());
+ }
+ catch (const Ice::NoEndpointException& ex)
+ {
+ }
+ return orig;
+}
+
}; /** End of namespace AsteriskSCF */
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index 21bca8d..5c8b13f 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -17,6 +17,9 @@
#include "SessionWrapper.h"
#include <Ice/Ice.h>
#include <AsteriskSCF/logger.h>
+#include "ServiceUtil.h"
+
+#include <boost/thread/once.hpp>
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::SessionCommunications::V1;
@@ -25,6 +28,75 @@ using namespace AsteriskSCF::Bridge::V1;
using namespace AsteriskSCF;
using namespace std;
+namespace
+{
+//
+// NOTE: This object is used in support of making connect() calls on sessions via AMI. It's a fair candidate for a
+// singleton as you could pass the session as a cookie to the AMI call. However, it is desirable to have some logging,
+// and initializating the singleton from the proper logger instance is a little problematic to do in a fashion
+// consistent with how the rest of the service uses a logger. An alternative would be to have the actual implementation
+// of the callbacks be implemented by the SessionWrapper, which already has the logger instance. I will leave this as is
+// for now as introducing callback interfaces on the wrapper for AMI callback purposes seems overkill for the moment. So
+// perhaps this should be labelled REVISIT.
+//
+class ConnectRequestCallback : public IceUtil::Shared
+{
+public:
+ ConnectRequestCallback(const SessionWrapperPtr& session, const Logger& logger):
+ mSession(session),
+ mLogger(logger)
+ {
+ }
+
+ void connectCB()
+ {
+ //
+ // There isn't rally anything to do on success.
+ //
+ mLogger(Debug) << "connect call on " << mSession->id() << " succeeded.";
+ }
+
+ void failureCB(const Ice::Exception& ex)
+ {
+ //
+ // Failure is a little more interesting. We should really retry on some exceptions, and
+ // auto-disconnect the session for others.
+ //
+ try
+ {
+ ex.ice_throw();
+ }
+ catch (const Ice::RequestFailedException& ex)
+ {
+ //
+ // These exceptions indicate something is wrong with the request on this object. We should not expect that
+ // it could ever exceed. ObjectNotExistException is included in this catch.
+ // XXX- it might be reasonable to disconnect this session for this!
+ //
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ }
+ catch (const Ice::SocketException& ex)
+ {
+ //
+ // A possible retry scenario.
+ //
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ }
+ catch (const std::exception& ex)
+ {
+ //
+ // Not much to be done here except log the error and move on.
+ //
+ mLogger(Warning) << "exception when calling connect() on " << mSession->id() << ": " << ex.what();
+ }
+ }
+
+private:
+ SessionWrapperPtr mSession;
+ Logger mLogger;
+};
+}
+
SessionWrapper::SessionWrapper(const BridgedSessionPtr& session,
const MediaSplicerPtr& splicer, const ReplicatorSmartPrx& replicator,
const Logger& logger) :
@@ -46,7 +118,17 @@ void SessionWrapper::connect()
{
if (setConnected())
{
- mSession->session->connect();
+ //
+ // A word about expected context. In a conference bridge, it is quite possible that multiple sessions will have
+ // connect() called on them "at once". By using AMI to send these requests out, we free the
+ // SessionWrapper::connect() caller alone to proceed. In a sense, each individual session wrapper instance takes
+ // care of dealing with the AMI callbacks. More accurately, there is a class wide singleton for the callback
+ // object and the SessionWrapper itself is passed as the cookie. No further "book-keeping" is required.
+ //
+ mSession->session->begin_connect(
+ newCallback_Session_connect(new ConnectRequestCallback(this, mLogger),
+ &ConnectRequestCallback::connectCB,
+ &ConnectRequestCallback::failureCB), this);
}
}
@@ -59,7 +141,7 @@ void SessionWrapper::ring()
return;
}
}
- mSession->session->ring();
+ tryOneWay(mSession->session)->ring();
}
bool SessionWrapper::setConnected()
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index e11da7c..c63fdb4 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -26,7 +26,7 @@ namespace AsteriskSCF
namespace BridgeService
{
-class SessionWrapper : public IceUtil::Shared
+class SessionWrapper : public Ice::LocalObject
{
public:
SessionWrapper(const AsteriskSCF::Bridge::V1::BridgedSessionPtr& session,
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list