[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "sessioncontroller" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Jul 18 18:58:20 CDT 2011
branch "sessioncontroller" has been created
at efb149a5e3937f8db08d9e057286f7a99be8fca1 (commit)
- Log -----------------------------------------------------------------
commit efb149a5e3937f8db08d9e057286f7a99be8fca1
Author: Joshua Colp <jcolp at digium.com>
Date: Mon Jul 18 20:59:27 2011 -0300
Add streams support for media splicing.
diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 4b404cb..d352fbb 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -107,7 +107,7 @@ class SessionPairing extends ReplicatedStateItem
{
string bridgeKey;
string sessionKey;
- AsteriskSCF::Media::V1::Session* mediaSession;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaPairingSeq incomingMediaPairings;
MediaPairingSeq outgoingMediaPairings;
};
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index e4992fb..6263e71 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -68,9 +68,7 @@ typedef vector<IncomingPairing> IncomingPairings;
struct MediaConnectorBuilder : public IceUtil::Shared
{
AsteriskSCF::SessionCommunications::V1::SessionPrx sessionPrx;
- AsteriskSCF::Media::V1::SessionPrx mediaSession;
- AsteriskSCF::Media::V1::StreamSourceSeq sources;
- AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaConnectorIPtr peer;
MediaConnectorIPtr connector;
OutgoingPairings outgoingPairings;
@@ -91,7 +89,7 @@ public:
const std::string& sessionId,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(data->mediaSession),
+ mStreams(data->streams),
mOutgoing(data->outgoingPairings),
mIncoming(data->incomingPairings),
mPeer(data->peer),
@@ -111,7 +109,7 @@ public:
const MediaConnectorPtr& peer,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(pairing->mediaSession),
+ mStreams(pairing->streams),
mPeer(peer),
mConnected(true),
mBridgeId(pairing->bridgeKey),
@@ -184,7 +182,7 @@ public:
if (mReplicator)
{
SessionPairingPtr currentState(new SessionPairing);
- currentState->mediaSession = mMedia;
+ currentState->streams = mStreams;
currentState->bridgeKey = mBridgeId;
currentState->sessionKey = mSessionId;
currentState->key = mKey;
@@ -230,9 +228,13 @@ public:
void destroy()
{
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -247,37 +249,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
}
@@ -301,9 +298,13 @@ public:
{
mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -318,37 +319,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: Call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
return true;
@@ -361,9 +357,9 @@ public:
private:
boost::shared_mutex mLock;
- AsteriskSCF::Media::V1::SessionPrx mMedia;
vector<OutgoingPairing> mOutgoing;
vector<IncomingPairing> mIncoming;
+ StreamInformationDict mStreams;
MediaConnectorPtr mPeer;
bool mConnected;
string mBridgeId;
@@ -408,7 +404,7 @@ public:
}
connector->initialUpdate();
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+ mSessions.push_back(MediaSessionStruct(data->streams, connector));
return connector;
}
@@ -479,106 +475,64 @@ public:
existing->clearConnections();
}
result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
- mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+ mSessions.push_back(MediaSessionStruct(pairings->streams, result));
return result;
}
- vector<OutgoingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSinkSeq& sinks)
+ void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+ vector<IncomingPairing>& incoming)
{
- vector<StreamSourceSeq> allSources;
-
+ // If no streams are present we can not establish any pairings
+ if (streams.empty())
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
- {
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
- {
- continue;
- }
- allSources.push_back(i->mediaSession->getSources());
- }
+ return;
}
- vector<OutgoingPairing> result;
- for (StreamSinkSeq::const_iterator sink = sinks.begin(); sink != sinks.end(); ++sink)
- {
- bool connected = false;
- for (vector<StreamSourceSeq>::iterator sources = allSources.begin(); sources != allSources.end() && !connected;
- ++sources)
- {
- for (StreamSourceSeq::iterator source = sources->begin(); source != sources->end(); ++source)
- {
- if (canAdapt(*sink, *source))
- {
- result.push_back(OutgoingPairing(*sink, *source));
- sources->erase(source);
- connected = true;
- break;
- }
- }
- }
- if (!connected)
- {
- //
- // TODO: We couldn't establish a proper connection for this sink. What happens
- // here is probably a policy thing.
- //
- }
- }
- return result;
- }
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
- vector<IncomingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSourceSeq& sources)
- {
- vector<StreamSinkSeq> allSinks;
+ for (MediaSessions::const_iterator i = mSessions.begin(); i != mSessions.end(); ++i)
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ // If no streams are present go ahead and skip it
+ if (i->streams.empty())
{
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
- {
- continue;
- }
- allSinks.push_back(i->mediaSession->getSinks());
+ continue;
}
- }
- vector<IncomingPairing> result;
- for (StreamSourceSeq::const_iterator source = sources.begin(); source != sources.end(); ++source)
- {
- bool connected = false;
- for (vector<StreamSinkSeq>::iterator sinks = allSinks.begin(); sinks != allSinks.end() && !connected; ++sinks)
+ // Create a copy so we can remove streams once pairings have been established for them
+ StreamInformationDict theirStreams = i->streams;
+
+ // Iterate our own streams looking for any that can be connected to this session
+ for (StreamInformationDict::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
{
- for (StreamSinkSeq::iterator sink = sinks->begin(); sink != sinks->end(); ++sink)
+ for (StreamInformationDict::const_iterator theirStream = theirStreams.begin();
+ theirStream != theirStreams.end();
+ ++theirStream)
{
- if (canAdapt(*source, *sink))
+ // If this is us skip all streams
+ if (stream->second->sinks.front() == theirStream->second->sinks.front())
{
- result.push_back(IncomingPairing(*source, *sink));
- sink = sinks->erase(sink);
- connected = true;
break;
}
+
+ // Is this stream compatible with the other stream?
+ if (haveCommonFormats(stream->second, theirStream->second) == false)
+ {
+ continue;
+ }
+
+ // Connect sources and sinks together, right now we only support the first
+ outgoing.push_back(OutgoingPairing(stream->second->sinks.front(), theirStream->second->sources.front()));
+ incoming.push_back(IncomingPairing(stream->second->sources.front(), theirStream->second->sinks.front()));
+
+ theirStreams.erase(theirStream->first);
+ break;
}
}
- if (!connected)
- {
- //
- // TODO: We couldn't establish a proper connection for this source! What happens now
- // should be a matter of policy.
- //
- }
}
- return result;
}
-
+
private:
boost::shared_mutex mLock;
@@ -589,11 +543,11 @@ private:
struct MediaSessionStruct
{
- SessionPrx mediaSession;
+ StreamInformationDict streams;
MediaConnectorIPtr connector;
- MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
- mediaSession(session),
+ MediaSessionStruct(const StreamInformationDict& strms, const MediaConnectorIPtr& con) :
+ streams(strms),
connector(con)
{
}
@@ -603,15 +557,6 @@ private:
MediaSessions mSessions;
- bool needMixing()
- {
- //
- // TODO: We are only supporting single stream with two endpoints for the moment, so no mixing will
- // be required.
- //
- return false;
- }
-
bool hasPreferredCodecOverride()
{
//
@@ -622,93 +567,50 @@ private:
return false;
}
- bool canAdapt(const Media::V1::StreamSourcePrx&, const Media::V1::StreamSinkPrx&)
- {
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-
- bool canAdapt(const Media::V1::StreamSinkPrx&, const Media::V1::StreamSourcePrx&)
+ bool canAdapt(const StreamInformationPtr&, const StreamInformationPtr&)
{
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-};
-
-class GetMediaSession : public QueuedTask
-{
-public:
- GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetMediaSession"),
- mSession(session),
- mMaterials(materials)
- {
- }
-
-protected:
- bool executeImpl()
- {
- mMaterials->sessionPrx->begin_getMediaSession(
- AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
- &GetMediaSession::done, &GetMediaSession::failed));
+ // For the moment we can not transcode
return false;
}
- void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+ bool haveCommonFormats(const StreamInformationPtr& stream0, const StreamInformationPtr& stream1)
{
- mMaterials->mediaSession = mediaSession;
- mListener->succeeded();
- }
+ for (FormatSeq::const_iterator format0 = stream0->formats.begin();
+ format0 != stream0->formats.end();
+ ++format0)
+ {
+ for (FormatSeq::const_iterator format1 = stream1->formats.begin();
+ format1 != stream1->formats.end();
+ ++format1)
+ {
+ // Is the name alike?
+ if ((*format0)->name != (*format1)->name)
+ {
+ continue;
+ }
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- SessionWrapperPtr mSession;
- MediaConnectorBuilderPtr mMaterials;
-};
+ // Is a format operations service available, and if so are they compatible?
+ if ((*format0)->operations &&
+ (((*format0)->operations != (*format1)->operations ||
+ ((*format0)->operations->checkCompatible((*format0), (*format1)) == false))))
+ {
+ continue;
+ }
-class GetSinks : public QueuedTask
-{
-public:
- GetSinks(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSinks"),
- mMaterials(materials)
- {
- }
+ return true;
+ }
+ }
-protected:
- bool executeImpl()
- {
- mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
- &GetSinks::done, &GetSinks::failed));
return false;
}
-
- void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
- {
- mMaterials->sinks = sinks;
- mListener->succeeded();
- }
-
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- MediaConnectorBuilderPtr mMaterials;
};
-class GetSources : public QueuedTask
+class GetStreams : public QueuedTask
{
public:
- GetSources(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSources"),
+ GetStreams(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetStreams"),
+ mSession(session),
mMaterials(materials)
{
}
@@ -716,14 +618,15 @@ public:
protected:
bool executeImpl()
{
- mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
- &GetSources::done, &GetSources::failed));
+ mMaterials->sessionPrx->begin_getStreams(
+ AsteriskSCF::SessionCommunications::V1::newCallback_Session_getStreams(this,
+ &GetStreams::done, &GetStreams::failed));
return false;
}
- void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+ void done(const AsteriskSCF::Media::V1::StreamInformationDict& streams)
{
- mMaterials->sources = sources;
+ mMaterials->streams = streams;
mListener->succeeded();
}
@@ -731,8 +634,8 @@ protected:
{
mListener->failed();
}
-
private:
+ SessionWrapperPtr mSession;
MediaConnectorBuilderPtr mMaterials;
};
@@ -749,8 +652,7 @@ public:
protected:
bool executeImpl()
{
- mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sources);
- mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sinks);
+ mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings);
return true;
}
@@ -995,9 +897,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
materials->peer = peer;
materials->sessionPrx = sessionPrx;
- tasks.push_back(new GetMediaSession(session, materials));
- tasks.push_back(new GetSinks(materials));
- tasks.push_back(new GetSources(materials));
+ tasks.push_back(new GetStreams(session, materials));
tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
tasks.push_back(new GetCompatiblePairings(splicer, materials));
tasks.push_back(new MakeConnections(materials));
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list