[asterisk-scf-commits] asterisk-scf/release/bridging.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Sun Sep 4 11:04:02 CDT 2011
branch "master" has been updated
via 7ae703dcaf863f700b577ec6796e3ef81bbb0ebf (commit)
from 8b9064121d04e22ab3277b8549f0ce8c6e2f69a2 (commit)
Summary of changes:
.../BridgeService/BridgeReplicatorIf.ice | 2 +-
src/BridgeImpl.cpp | 138 ++++++-
src/MediaSplicer.cpp | 438 ++++++++------------
src/MediaSplicer.h | 5 +
src/SessionOperations.cpp | 123 ++++++
src/SessionOperations.h | 41 ++
src/SessionWrapper.cpp | 23 +
src/SessionWrapper.h | 13 +
8 files changed, 527 insertions(+), 256 deletions(-)
- Log -----------------------------------------------------------------
commit 7ae703dcaf863f700b577ec6796e3ef81bbb0ebf
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Sep 4 13:08:41 2011 -0300
Add support for multiple streams using the SessionController interface.
diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 5a7892c..fb0b7ec 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -97,7 +97,7 @@ class SessionPairing extends ReplicatedStateItem
{
string bridgeKey;
string sessionKey;
- AsteriskSCF::Media::V1::Session* mediaSession;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaPairingSeq incomingMediaPairings;
MediaPairingSeq outgoingMediaPairings;
};
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index a9cd9e1..a7b2feb 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -78,7 +78,6 @@ public:
// AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
//
void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
- void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
const Ice::Current&);
@@ -381,6 +380,140 @@ private:
SessionsTrackerPtr mTracker;
};
+class BridgeSessionController : public SessionController
+{
+public:
+ BridgeSessionController(const BridgeImplPtr& bridge,
+ const SessionWrapperPtr& self,
+ const Logger& logger) : mBridge(bridge), mSelf(self), mLogger(logger) { }
+
+ void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+ const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
+ {
+ // We do not care about stream state changes at this point in time
+ cb->ice_response();
+ }
+
+ void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ {
+ AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
+ mBridge->getSessions()->visitSessions(*op);
+ }
+
+ void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ {
+ RemoveStreamsOperation op(mSelf, streams);
+ mBridge->getSessions()->visitSessions(op);
+ cb->ice_response();
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionWrapperPtr mSelf;
+ Logger mLogger;
+};
+
+class SetAndGetSessionControllerTask : public QueuedTask
+{
+public:
+ SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+ const SessionCollectionPtr& sessionCollection,
+ const SessionSeq& sessions,
+ const BridgeImplPtr& bridge,
+ const Logger& logger):
+ QueuedTask("SetAndGetSessionControllerTask"),
+ mAdapter(adapter),
+ mSessionManager(sessionCollection),
+ mSessions(sessions),
+ mTracker(new SessionsTracker()),
+ mBridge(bridge),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->getSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session not found");
+ continue;
+ }
+ tasksDispatched = true;
+ SessionControllerPtr controller = new BridgeSessionController(mBridge, session, mLogger);
+ std::string identity = (*i)->ice_getIdentity().name;
+ identity += ".bridgecontroller";
+ SessionControllerPrx controllerPrx =
+ SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+ (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
+ &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+ }
+ return !tasksDispatched;
+ }
+
+ void get(const SessionControllerPrx& controller, const SessionWrapperPtr& session)
+ {
+ session->setSessionController(controller);
+ mTracker->add(session->getSession());
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+ {
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
+ try
+ {
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
+ }
+ catch(...)
+ {
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
+ {
+ mListener->failed();
+ }
+ else
+ {
+ mListener->succeeded();
+ }
+ }
+ }
+
+private:
+ Ice::ObjectAdapterPtr mAdapter;
+ SessionCollectionPtr mSessionManager;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
+ BridgeImplPtr mBridge;
+ Logger mLogger;
+};
+
class AddToListeners : public QueuedTask
{
public:
@@ -563,6 +696,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
QueuedTasks tasks;
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -827,6 +961,7 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, cookies));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -1037,6 +1172,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
SessionsTrackerPtr tracker(new SessionsTracker);
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new UpdateTask(this));
}
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index e4992fb..a6a0a1e 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -68,13 +68,12 @@ typedef vector<IncomingPairing> IncomingPairings;
struct MediaConnectorBuilder : public IceUtil::Shared
{
AsteriskSCF::SessionCommunications::V1::SessionPrx sessionPrx;
- AsteriskSCF::Media::V1::SessionPrx mediaSession;
- AsteriskSCF::Media::V1::StreamSourceSeq sources;
- AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaConnectorIPtr peer;
MediaConnectorIPtr connector;
OutgoingPairings outgoingPairings;
IncomingPairings incomingPairings;
+ std::map<std::string, std::string> connections;
};
typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
@@ -91,9 +90,9 @@ public:
const std::string& sessionId,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(data->mediaSession),
mOutgoing(data->outgoingPairings),
mIncoming(data->incomingPairings),
+ mStreams(data->streams),
mPeer(data->peer),
mConnected(true),
mBridgeId(bridgeId),
@@ -111,7 +110,7 @@ public:
const MediaConnectorPtr& peer,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(pairing->mediaSession),
+ mStreams(pairing->streams),
mPeer(peer),
mConnected(true),
mBridgeId(pairing->bridgeKey),
@@ -165,7 +164,7 @@ public:
}
void update(const MediaConnectorPtr& peer, const OutgoingPairings& outgoing,
- const IncomingPairings& incoming)
+ const IncomingPairings& incoming, const std::map<std::string, std::string>& newConnections)
{
mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
SessionPairingPtr currentState;
@@ -174,6 +173,7 @@ public:
mPeer = peer;
mOutgoing = outgoing;
mIncoming = incoming;
+ mConnections = newConnections;
currentState = createUpdate();
}
pushUpdate(currentState);
@@ -184,7 +184,7 @@ public:
if (mReplicator)
{
SessionPairingPtr currentState(new SessionPairing);
- currentState->mediaSession = mMedia;
+ currentState->streams = mStreams;
currentState->bridgeKey = mBridgeId;
currentState->sessionKey = mSessionId;
currentState->key = mKey;
@@ -230,9 +230,13 @@ public:
void destroy()
{
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -247,37 +251,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
}
@@ -301,9 +300,13 @@ public:
{
mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -318,37 +321,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: Call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
return true;
@@ -359,11 +357,21 @@ public:
return mKey;
}
+ StreamInformationDict streams()
+ {
+ return mStreams;
+ }
+
+ std::map<std::string, std::string> connections()
+ {
+ return mConnections;
+ }
+
private:
boost::shared_mutex mLock;
- AsteriskSCF::Media::V1::SessionPrx mMedia;
vector<OutgoingPairing> mOutgoing;
vector<IncomingPairing> mIncoming;
+ StreamInformationDict mStreams;
MediaConnectorPtr mPeer;
bool mConnected;
string mBridgeId;
@@ -372,6 +380,7 @@ private:
Ice::Long mRepCounter;
ReplicatorSmartPrx mReplicator;
Logger mLogger;
+ std::map<std::string, std::string> mConnections;
};
QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
@@ -404,11 +413,11 @@ public:
//
if (data->peer)
{
- data->peer->update(connector, data->outgoingPairings, data->incomingPairings);
+ data->peer->update(connector, data->outgoingPairings, data->incomingPairings, data->connections);
}
connector->initialUpdate();
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+ mConnectors.push_back(connector);
return connector;
}
@@ -422,13 +431,13 @@ public:
//
// Reap.
//
- MediaSessions::iterator i = mSessions.begin();
- while (i != mSessions.end())
+ MediaConnectors::iterator i = mConnectors.begin();
+ while (i != mConnectors.end())
{
- if (!i->connector->isConnected())
+ if (!(*i)->isConnected())
{
mLogger(Debug) << FUNLOG << ": reaping a connector";
- i = mSessions.erase(i);
+ i = mConnectors.erase(i);
continue;
}
++i;
@@ -439,9 +448,9 @@ public:
// one of them is going to have an empty connector.
//
MediaConnectorIPtr existing;
- if (mSessions.size() == 1)
+ if (mConnectors.size() == 1)
{
- existing = mSessions.back().connector;
+ existing = mConnectors.back();
existing->clearConnections();
}
@@ -459,126 +468,112 @@ public:
boost::unique_lock<boost::shared_mutex> lock(mLock);
MediaConnectorIPtr result;
- MediaSessions::iterator i = mSessions.begin();
- while (i != mSessions.end())
+ MediaConnectors::iterator i = mConnectors.begin();
+ while (i != mConnectors.end())
{
- if (!i->connector->isConnected())
+ if (!(*i)->isConnected())
{
mLogger(Debug) << FUNLOG << ": reaping a connector";
- MediaSessions::iterator t = i;
- i = mSessions.erase(t);
+ MediaConnectors::iterator t = i;
+ i = mConnectors.erase(t);
continue;
}
++i;
}
MediaConnectorPtr existing;
- if (mSessions.size() == 1)
+ if (mConnectors.size() == 1)
{
- existing = mSessions.back().connector;
+ existing = mConnectors.back();
existing->clearConnections();
}
result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
- mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+ mConnectors.push_back(result);
return result;
}
- vector<OutgoingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSinkSeq& sinks)
+ void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+ vector<IncomingPairing>& incoming,
+ MediaConnectorIPtr connector,
+ std::map<std::string, std::string>& connections)
{
- vector<StreamSourceSeq> allSources;
-
+ // If no streams are present we can not establish any pairings
+ if (streams.empty())
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
- {
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
- {
- continue;
- }
- allSources.push_back(i->mediaSession->getSources());
- }
+ return;
}
- vector<OutgoingPairing> result;
- for (StreamSinkSeq::const_iterator sink = sinks.begin(); sink != sinks.end(); ++sink)
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ for (MediaConnectors::const_iterator i = mConnectors.begin(); i != mConnectors.end(); ++i)
{
- bool connected = false;
- for (vector<StreamSourceSeq>::iterator sources = allSources.begin(); sources != allSources.end() && !connected;
- ++sources)
+ if ((*i)->id() == connector->id())
{
- for (StreamSourceSeq::iterator source = sources->begin(); source != sources->end(); ++source)
- {
- if (canAdapt(*sink, *source))
- {
- result.push_back(OutgoingPairing(*sink, *source));
- sources->erase(source);
- connected = true;
- break;
- }
- }
+ continue;
}
- if (!connected)
+
+ StreamInformationDict theirStreams = (*i)->streams();
+
+ // If no streams are present go ahead and skip it
+ if (theirStreams.empty())
{
- //
- // TODO: We couldn't establish a proper connection for this sink. What happens
- // here is probably a policy thing.
- //
+ continue;
}
- }
- return result;
- }
- vector<IncomingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSourceSeq& sources)
- {
- vector<StreamSinkSeq> allSinks;
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ // Iterate our own streams looking for any that can be connected to this session
+ for (StreamInformationDict::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
{
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
+ if (stream->second->state == Removed)
{
continue;
}
- allSinks.push_back(i->mediaSession->getSinks());
- }
- }
- vector<IncomingPairing> result;
- for (StreamSourceSeq::const_iterator source = sources.begin(); source != sources.end(); ++source)
- {
- bool connected = false;
- for (vector<StreamSinkSeq>::iterator sinks = allSinks.begin(); sinks != allSinks.end() && !connected; ++sinks)
- {
- for (StreamSinkSeq::iterator sink = sinks->begin(); sink != sinks->end(); ++sink)
+ for (StreamInformationDict::const_iterator theirStream = theirStreams.begin();
+ theirStream != theirStreams.end();
+ ++theirStream)
{
- if (canAdapt(*source, *sink))
+ if (theirStream->second->state == Removed)
{
- result.push_back(IncomingPairing(*source, *sink));
- sink = sinks->erase(sink);
- connected = true;
- break;
+ continue;
}
+
+ // Is this stream compatible with the other stream?
+ if (haveCommonFormats(stream->second, theirStream->second) == false)
+ {
+ continue;
+ }
+
+ // Connect all available sources and sinks together
+ StreamSinkSeq sinks = stream->second->sinks;
+ StreamSourceSeq sources = stream->second->sources;
+
+ while (!sinks.empty() && !theirStream->second->sources.empty())
+ {
+ outgoing.push_back(OutgoingPairing(sinks.back(), theirStream->second->sources.back()));
+ sinks.pop_back();
+ theirStream->second->sources.pop_back();
+ }
+
+ while (!sources.empty() && !theirStream->second->sinks.empty())
+ {
+ incoming.push_back(IncomingPairing(sources.back(), theirStream->second->sinks.back()));
+ sources.pop_back();
+ theirStream->second->sinks.pop_back();
+ }
+
+ // Record this stream connection
+ connections.insert(make_pair(theirStream->first, stream->first));
+
+ // This stream has been connected so no longer consider it
+ theirStreams.erase(theirStream->first);
+ break;
}
}
- if (!connected)
- {
- //
- // TODO: We couldn't establish a proper connection for this source! What happens now
- // should be a matter of policy.
- //
- }
}
- return result;
}
-
+
private:
boost::shared_mutex mLock;
@@ -587,30 +582,9 @@ private:
ReplicatorSmartPrx mReplicator;
Logger mLogger;
- struct MediaSessionStruct
- {
- SessionPrx mediaSession;
- MediaConnectorIPtr connector;
-
- MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
- mediaSession(session),
- connector(con)
- {
- }
- };
-
- typedef vector<MediaSessionStruct> MediaSessions;
+ typedef vector<MediaConnectorIPtr> MediaConnectors;
- MediaSessions mSessions;
-
- bool needMixing()
- {
- //
- // TODO: We are only supporting single stream with two endpoints for the moment, so no mixing will
- // be required.
- //
- return false;
- }
+ MediaConnectors mConnectors;
bool hasPreferredCodecOverride()
{
@@ -622,93 +596,50 @@ private:
return false;
}
- bool canAdapt(const Media::V1::StreamSourcePrx&, const Media::V1::StreamSinkPrx&)
- {
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-
- bool canAdapt(const Media::V1::StreamSinkPrx&, const Media::V1::StreamSourcePrx&)
+ bool canAdapt(const StreamInformationPtr&, const StreamInformationPtr&)
{
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-};
-
-class GetMediaSession : public QueuedTask
-{
-public:
- GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetMediaSession"),
- mSession(session),
- mMaterials(materials)
- {
- }
-
-protected:
- bool executeImpl()
- {
- mMaterials->sessionPrx->begin_getMediaSession(
- AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
- &GetMediaSession::done, &GetMediaSession::failed));
+ // For the moment we can not transcode
return false;
}
- void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+ bool haveCommonFormats(const StreamInformationPtr& stream0, const StreamInformationPtr& stream1)
{
- mMaterials->mediaSession = mediaSession;
- mListener->succeeded();
- }
+ for (FormatSeq::const_iterator format0 = stream0->formats.begin();
+ format0 != stream0->formats.end();
+ ++format0)
+ {
+ for (FormatSeq::const_iterator format1 = stream1->formats.begin();
+ format1 != stream1->formats.end();
+ ++format1)
+ {
+ // Is the name alike?
+ if ((*format0)->name != (*format1)->name)
+ {
+ continue;
+ }
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- SessionWrapperPtr mSession;
- MediaConnectorBuilderPtr mMaterials;
-};
+ // Is a format operations service available, and if so are they compatible?
+ if ((*format0)->operations &&
+ (((*format0)->operations != (*format1)->operations ||
+ ((*format0)->operations->checkCompatible((*format0), (*format1)) == false))))
+ {
+ continue;
+ }
-class GetSinks : public QueuedTask
-{
-public:
- GetSinks(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSinks"),
- mMaterials(materials)
- {
- }
+ return true;
+ }
+ }
-protected:
- bool executeImpl()
- {
- mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
- &GetSinks::done, &GetSinks::failed));
return false;
}
-
- void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
- {
- mMaterials->sinks = sinks;
- mListener->succeeded();
- }
-
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- MediaConnectorBuilderPtr mMaterials;
};
-class GetSources : public QueuedTask
+class GetStreams : public QueuedTask
{
public:
- GetSources(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSources"),
+ GetStreams(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetStreams"),
+ mSession(session),
mMaterials(materials)
{
}
@@ -716,14 +647,15 @@ public:
protected:
bool executeImpl()
{
- mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
- &GetSources::done, &GetSources::failed));
+ mMaterials->sessionPrx->begin_getStreams(
+ AsteriskSCF::SessionCommunications::V1::newCallback_Session_getStreams(this,
+ &GetStreams::done, &GetStreams::failed));
return false;
}
- void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+ void done(const AsteriskSCF::Media::V1::StreamInformationDict& streams)
{
- mMaterials->sources = sources;
+ mMaterials->streams = streams;
mListener->succeeded();
}
@@ -731,8 +663,8 @@ protected:
{
mListener->failed();
}
-
private:
+ SessionWrapperPtr mSession;
MediaConnectorBuilderPtr mMaterials;
};
@@ -749,8 +681,8 @@ public:
protected:
bool executeImpl()
{
- mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sources);
- mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sinks);
+ mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
+ mMaterials->connector, mMaterials->connections);
return true;
}
@@ -897,7 +829,7 @@ protected:
{
if (mMaterials->connector)
{
- mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+ mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
}
mListener->succeeded();
}
@@ -919,7 +851,7 @@ protected:
{
if (mMaterials->connector)
{
- mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+ mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
}
mListener->succeeded();
}
@@ -995,9 +927,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
materials->peer = peer;
materials->sessionPrx = sessionPrx;
- tasks.push_back(new GetMediaSession(session, materials));
- tasks.push_back(new GetSinks(materials));
- tasks.push_back(new GetSources(materials));
+ tasks.push_back(new GetStreams(session, materials));
tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
tasks.push_back(new GetCompatiblePairings(splicer, materials));
tasks.push_back(new MakeConnections(materials));
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index e00b2a9..31b3f19 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -72,6 +72,11 @@ public:
* Replication support function.
**/
virtual void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& update) = 0;
+
+ /**
+ * Return connection information.
+ */
+ virtual std::map<std::string, std::string> connections() = 0;
};
typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 58670a9..f7f8508 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -100,3 +100,126 @@ void RelayIndication::operator()(const SessionWrapperPtr& session)
}
}
}
+
+AddStreamsOperation::AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const Logger& logger) :
+ mCb(cb),
+ mSelf(self),
+ mStreams(streams),
+ mIgnore(false),
+ mLogger(logger)
+{
+}
+
+void AddStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (mIgnore == true || mSelf == session)
+ {
+ return;
+ }
+
+ SessionControllerPrx controller = session->getSessionController();
+
+ if (!controller)
+ {
+ return;
+ }
+
+ // Go ahead and request that the streams be added
+ controller->begin_addStreams(mStreams, newCallback_SessionController_addStreams(this,
+ &AddStreamsOperation::added,
+ &AddStreamsOperation::failed),
+ session);
+
+ // We only allow requesting streams on a single session at this point
+ mIgnore = true;
+}
+
+void AddStreamsOperation::added(const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const SessionWrapperPtr& session)
+{
+ // If any streams were added set up media so it flows
+ if (!streams.empty())
+ {
+ session->unplugMedia();
+ mSelf->unplugMedia();
+ session->setupMedia();
+ mSelf->setupMedia();
+ }
+
+ // Now that everything is ready go ahead and respond
+ mCb->ice_response(streams);
+}
+
+void AddStreamsOperation::failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+{
+ // Since this failed tell the session that initiated this operation that no streams were added
+ mCb->ice_response(AsteriskSCF::Media::V1::StreamInformationDict());
+
+ mLogger(Debug) << FUNLOG << ": (" << session->getSession() << ") " << ex.what();
+}
+
+RemoveStreamsOperation::RemoveStreamsOperation(const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams) :
+ mSelf(self),
+ mStreams(streams),
+ mIgnore(false)
+{
+}
+
+void RemoveStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (mIgnore == true || mSelf == session)
+ {
+ return;
+ }
+
+ AsteriskSCF::Media::V1::StreamInformationDict remove;
+ std::map<std::string, std::string> connections = session->connections();
+
+ // Determine the streams we actually need to remove
+ for (AsteriskSCF::Media::V1::StreamInformationDict::const_iterator stream = mStreams.begin();
+ stream != mStreams.end();
+ ++stream)
+ {
+ std::map<std::string, std::string>::const_iterator connection = connections.find(stream->first);
+
+ if (connection != connections.end())
+ {
+ remove.insert(make_pair(connection->second, new AsteriskSCF::Media::V1::StreamInformation()));
+ }
+ }
+
+ if (remove.empty())
+ {
+ return;
+ }
+
+ SessionControllerPrx controller = session->getSessionController();
+
+ if (!controller)
+ {
+ return;
+ }
+
+ controller->begin_removeStreams(remove, newCallback_SessionController_removeStreams(this,
+ &RemoveStreamsOperation::removed,
+ &RemoveStreamsOperation::failed),
+ session);
+
+ mIgnore = true;
+}
+
+void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
+{
+ session->unplugMedia();
+ mSelf->unplugMedia();
+ session->setupMedia();
+ mSelf->setupMedia();
+}
+
+void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
+{
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index 97726fc..ce73388 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -16,6 +16,7 @@
#pragma once
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/logger.h>
#include "BridgeReplicatorIf.h"
#include "SessionWrapper.h"
@@ -77,6 +78,46 @@ private:
bool mIncludeConnected;
};
+class AddStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+ void added(const AsteriskSCF::Media::V1::StreamInformationDict& streams, const SessionWrapperPtr& session);
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr mCb;
+ SessionWrapperPtr mSelf;
+ AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+ bool mIgnore;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+
+typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
+
+class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ RemoveStreamsOperation(const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams);
+
+ void operator()(const SessionWrapperPtr& session);
+
+ void removed(const SessionWrapperPtr& session);
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+ SessionWrapperPtr mSelf;
+ AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+ bool mIgnore;
+};
+
class IfStateCriteria
{
public:
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index ea9f8fa..1178f5e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -467,6 +467,19 @@ SessionPrx SessionWrapper::getSession() const
return mSession->session;
}
+SessionControllerPrx SessionWrapper::getSessionController() const
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessionController;
+}
+
+void SessionWrapper::setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessionController = controller;
+}
+
+
void SessionWrapper::setupMedia()
{
mLogger(Debug) << FUNLOG << " for " << mId;
@@ -566,6 +579,16 @@ string SessionWrapper::id()
return mId;
}
+std::map<std::string, std::string> SessionWrapper::connections()
+{
+ if (mConnector)
+ {
+ return mConnector->connections();
+ }
+
+ return std::map<std::string, std::string>();
+}
+
void SessionWrapper::setup()
{
mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 9ddb43c..0bdef83 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -81,6 +81,16 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionPrx getSession() const;
/**
+ * Accesses the session controller proxy that the session gave us.
+ */
+ AsteriskSCF::SessionCommunications::V1::SessionControllerPrx getSessionController() const;
+
+ /**
+ * Sets the session controller used to control this session.
+ */
+ void setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx&);
+
+ /**
* Sets the media connector object. This updates the media pairings associated
* with this session.
* Initiates replication.
@@ -114,6 +124,8 @@ public:
std::string id();
+ std::map<std::string, std::string> connections();
+
//
// Large scale macro operations. In effect the wrapper becomes a sub-component within the bridge.
//
@@ -144,6 +156,7 @@ private:
std::string mId;
MediaSplicerPtr mSplicer;
ExecutorPtr mActivities;
+ AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mSessionController;
/**
* Sends changes to the replication service. This should never occur
-----------------------------------------------------------------------
--
asterisk-scf/release/bridging.git
More information about the asterisk-scf-commits
mailing list