[asterisk-scf-commits] asterisk-scf/integration/bridging.git branch "bridgetelephony" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Sep 7 17:14:39 CDT 2011
branch "bridgetelephony" has been created
at 235a08493931af8d0602127bfe802b76093bee0e (commit)
- Log -----------------------------------------------------------------
commit 235a08493931af8d0602127bfe802b76093bee0e
Author: Ken Hunt <ken.hunt at digium.com>
Date: Wed Sep 7 17:14:24 2011 -0500
Connect/disconnect the telephony sources and sinks in the bridge as expected.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index 42d4b18..63115eb 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -681,6 +681,153 @@ private:
BridgeImplPtr mBridge;
};
+class ConnectTelephonyEventsTask: public QueuedTask
+{
+public:
+ ConnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+ const SessionSeq& newSessions,
+ const Logger& logger)
+ : QueuedTask("ConnectTelephonyEventsTask"),
+ mBridge(bridge),
+ mNewSessions(newSessions),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ // Assumption: All the sessions we need to connect to are already
+ // part of the bridge's session collection.
+ for (SessionSeq::iterator i = mNewSessions.begin(); i != mNewSessions.end(); ++i)
+ {
+ TelephonySessionPrx telephonySession = TelephonySessionPrx::checkedCast(*i);
+
+ if (telephonySession == 0)
+ {
+ continue;
+ }
+
+ ConnectTelephonyOperation op(telephonySession, mLogger);
+ mBridge->getSessions()->visitSessions(op);
+ }
+ return true;
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionSeq mNewSessions;
+ Logger mLogger;
+};
+
+class DisconnectTelephonyEventsTask: public QueuedTask
+{
+public:
+ DisconnectTelephonyEventsTask(const BridgeImplPtr& bridge,
+ const SessionSeq& disconnectingSessions,
+ const Logger& logger)
+ : QueuedTask("DisconnectTelephonyEventsTask"),
+ mBridge(bridge),
+ mDisconnectingSessions(disconnectingSessions),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ for (SessionSeq::iterator i = mDisconnectingSessions.begin(); i != mDisconnectingSessions.end(); ++i)
+ {
+ TelephonySessionPrx telephonySession = TelephonySessionPrx::checkedCast(*i);
+
+ if (telephonySession == 0)
+ {
+ continue;
+ }
+
+ DisconnectTelephonyOperation op(telephonySession, mLogger);
+ mBridge->getSessions()->visitSessions(op);
+ }
+
+ /**
+ * In ConnectTelephonyEventsTask, we could assume all the sessions were in the bridge's session collection
+ * and just use a visitor. But when a session is removed, AMI operations are involved. We use an
+ * extra pass over the set of sessions being removed, and disconnect each one from the other, to be safe.
+ */
+ disconnectMembers();
+
+ return true;
+ }
+
+ /**
+ * Disconnect the members of mDisconnectingSessions from each other.
+ */
+ void disconnectMembers()
+ {
+ for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+ i != mDisconnectingSessions.end(); ++i)
+ {
+ TelephonySessionPrx session1 = TelephonySessionPrx::checkedCast(*i);
+ if (session1 == 0)
+ {
+ // If not a telephony session, nothing to do.
+ continue;
+ }
+
+ disconnectMembersFrom(session1);
+ }
+ }
+
+ void disconnectMembersFrom(TelephonySessionPrx session1)
+ {
+ for(SessionSeq::iterator i = mDisconnectingSessions.begin();
+ i != mDisconnectingSessions.end(); ++i)
+ {
+ if (session1->ice_getIdentity() == (*i)->ice_getIdentity())
+ {
+ // It's not connected to itself.
+ continue;
+ }
+
+ TelephonySessionPrx session2 = TelephonySessionPrx::checkedCast(*i);
+ if (session2== 0)
+ {
+ // If not a telephony session, nothing to do.
+ continue;
+ }
+
+ disconnectSinks(session1, session2);
+ disconnectSinks(session2, session1);
+ }
+ }
+
+ void disconnectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+ {
+ if (sourceSession->ice_getIdentity() == sinkSession->ice_getIdentity())
+ {
+ // Not connected to ourselves.
+ return;
+ }
+
+ TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+ TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=fromSources.begin();
+ i != fromSources.end(); ++i)
+ {
+ (*i)->removeSinks(sinksToRemove);
+ }
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionSeq mDisconnectingSessions;
+ Logger mLogger;
+};
+
+
} // End of anonymous namespace
BridgeImpl::BridgeImpl(const string& name, const Ice::ObjectAdapterPtr& adapter,
@@ -742,6 +889,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -1002,6 +1150,10 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
boost::shared_lock<boost::shared_mutex> lock(mLock);
cookies = getCookies();
}
+
+ SessionSeq removedSessions;
+ removedSessions.push_back(sessionToReplace);
+
tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
tasks.push_back(new UnplugMedia(this));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
@@ -1009,6 +1161,8 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, newSessions, mLogger));
+ tasks.push_back(new DisconnectTelephonyEventsTask(this, removedSessions, mLogger));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -1221,6 +1375,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new SetupMedia(this));
+ tasks.push_back(new ConnectTelephonyEventsTask(this, sessions, mLogger));
tasks.push_back(new UpdateTask(this));
}
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index e6bf5f8..2f20789 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -233,3 +233,85 @@ void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
{
}
+
+ConnectTelephonyOperation::ConnectTelephonyOperation(const TelephonySessionPrx& sessionToConnect,
+ const Logger& logger)
+ : mSessionToConnect(sessionToConnect),
+ mLogger(logger)
+{
+}
+
+void ConnectTelephonyOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (session->getSession()->ice_getIdentity() == mSessionToConnect->ice_getIdentity())
+ {
+ // Let's not connect a session to itself. .
+ return;
+ }
+
+ TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(session->getSession());
+ if (visitedTelephonySession == 0)
+ {
+ // If the session being visited isn't telephony, nothing to do.
+ return;
+ }
+
+ connectSinks(mSessionToConnect, visitedTelephonySession);
+ connectSinks(visitedTelephonySession, mSessionToConnect);
+
+}
+
+void ConnectTelephonyOperation::connectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+{
+ TelephonyEventSinkSeq sinksToAdd = sinkSession->getSinks();
+ TelephonyEventSourceSeq toSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=toSources.begin();
+ i != toSources.end(); ++i)
+ {
+ (*i)->addSinks(sinksToAdd);
+ }
+}
+
+DisconnectTelephonyOperation::DisconnectTelephonyOperation(const TelephonySessionPrx& sessionToDisconnect,
+ const Logger& logger)
+ : mSessionToDisconnect(sessionToDisconnect),
+ mLogger(logger)
+{
+}
+
+void DisconnectTelephonyOperation::operator()(const SessionWrapperPtr& visitedSession)
+{
+ if (visitedSession->getSession()->ice_getIdentity() == mSessionToDisconnect->ice_getIdentity())
+ {
+ // Not connected to ourselves.
+ return;
+ }
+
+ TelephonySessionPrx visitedTelephonySession = TelephonySessionPrx::checkedCast(visitedSession->getSession());
+ if (visitedTelephonySession == 0)
+ {
+ // If the session being visited isn't telephony, nothing to do.
+ return;
+ }
+
+ disconnectSinks(mSessionToDisconnect, visitedTelephonySession);
+ disconnectSinks(visitedTelephonySession, mSessionToDisconnect);
+
+}
+
+void DisconnectTelephonyOperation::disconnectSinks
+ (const TelephonySessionPrx& sourceSession,
+ const TelephonySessionPrx& sinkSession)
+{
+ TelephonyEventSinkSeq sinksToRemove = sinkSession->getSinks();
+ TelephonyEventSourceSeq fromSources = sourceSession->getSources();
+
+ for(TelephonyEventSourceSeq::iterator i=fromSources.begin();
+ i != fromSources.end(); ++i)
+ {
+ (*i)->removeSinks(sinksToRemove);
+ }
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index a4e2c29..fbe432d 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -117,6 +117,40 @@ private:
typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
+class ConnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ ConnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToConnect,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+private:
+ void connectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession,
+ const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+ AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToConnect;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<ConnectTelephonyOperation> ConnectTelephonyOperationPtr;
+
+class DisconnectTelephonyOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ DisconnectTelephonyOperation(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sessionToDisconnect,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+private:
+ void disconnectSinks(const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sourceSession,
+ const AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx& sinkSession);
+
+ AsteriskSCF::SessionCommunications::V1::TelephonySessionPrx mSessionToDisconnect;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+typedef IceUtil::Handle<DisconnectTelephonyOperation> DisconnectTelephonyOperationPtr;
+
class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
{
public:
commit 7af97676c33ca5f32dbcf924bd02c2b31c230d4a
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Sep 4 13:45:54 2011 -0300
Add support for establishing direct media connections.
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index a7b2feb..42d4b18 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -514,6 +514,48 @@ private:
Logger mLogger;
};
+class UnplugMedia : public QueuedTask
+{
+public:
+ UnplugMedia(const BridgeImplPtr& bridge) :
+ QueuedTask("UnplugMedia"),
+ mBridge(bridge)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ UnplugMediaOperation op;
+ mBridge->getSessions()->visitSessions(op);
+ return true;
+ }
+
+private:
+ BridgeImplPtr mBridge;
+};
+
+class SetupMedia : public QueuedTask
+{
+public:
+ SetupMedia(const BridgeImplPtr& bridge) :
+ QueuedTask("SetupMedia"),
+ mBridge(bridge)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ SetupMediaOperation op;
+ mBridge->getSessions()->visitSessions(op);
+ return true;
+ }
+
+private:
+ BridgeImplPtr mBridge;
+};
+
class AddToListeners : public QueuedTask
{
public:
@@ -694,10 +736,12 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
SessionsTrackerPtr tracker(new SessionsTracker);
QueuedTasks tasks;
+ tasks.push_back(new UnplugMedia(this));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
+ tasks.push_back(new SetupMedia(this));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -959,10 +1003,12 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
cookies = getCookies();
}
tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
+ tasks.push_back(new UnplugMedia(this));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, cookies));
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
+ tasks.push_back(new SetupMedia(this));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
executor->start();
@@ -1170,9 +1216,11 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
const AsteriskSCF::SessionCommunications::V1::SessionSeq& sessions)
{
SessionsTrackerPtr tracker(new SessionsTracker);
+ tasks.push_back(new UnplugMedia(this));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
+ tasks.push_back(new SetupMedia(this));
tasks.push_back(new UpdateTask(this));
}
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index a6a0a1e..2cde267 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -74,6 +74,7 @@ struct MediaConnectorBuilder : public IceUtil::Shared
OutgoingPairings outgoingPairings;
IncomingPairings incomingPairings;
std::map<std::string, std::string> connections;
+ DirectMediaConnectionDict directConnections;
};
typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
@@ -495,7 +496,8 @@ public:
void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
vector<IncomingPairing>& incoming,
MediaConnectorIPtr connector,
- std::map<std::string, std::string>& connections)
+ std::map<std::string, std::string>& connections,
+ DirectMediaConnectionDict& directConnections)
{
// If no streams are present we can not establish any pairings
if (streams.empty())
@@ -549,6 +551,9 @@ public:
StreamSinkSeq sinks = stream->second->sinks;
StreamSourceSeq sources = stream->second->sources;
+ // Attempt to establish a direct connection later
+ directConnections.insert(make_pair(stream->first, theirStream->second->sinks.front()));
+
while (!sinks.empty() && !theirStream->second->sources.empty())
{
outgoing.push_back(OutgoingPairing(sinks.back(), theirStream->second->sources.back()));
@@ -682,7 +687,7 @@ protected:
bool executeImpl()
{
mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
- mMaterials->connector, mMaterials->connections);
+ mMaterials->connector, mMaterials->connections, mMaterials->directConnections);
return true;
}
@@ -919,6 +924,67 @@ private:
MediaConnectorBuilderPtr mMaterials;
};
+class MakeDirectConnections : public QueuedTask
+{
+public:
+ MakeDirectConnections(const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("MakeDirectConnections"),
+ mMaterials(materials)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ mDirectConnection = DirectMediaConnectionPrx::checkedCast(mMaterials->sessionPrx, directMediaConnectionFacet);
+
+ if (!mDirectConnection)
+ {
+ return true;
+ }
+
+ mDirectConnection->begin_checkDirectConnections(mMaterials->directConnections, newCallback_DirectMediaConnection_checkDirectConnections(
+ this, &MakeDirectConnections::accepted, &MakeDirectConnections::failed));
+
+ return false;
+ }
+
+ void accepted(const Ice::StringSeq& streams)
+ {
+ if (streams.empty())
+ {
+ mListener->succeeded();
+ return;
+ }
+
+ DirectMediaConnectionDict directConnections;
+
+ for (Ice::StringSeq::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
+ {
+ directConnections.insert(make_pair(*stream, mMaterials->directConnections.find(*stream)->second));
+ }
+
+ mDirectConnection->begin_connectStreams(directConnections, newCallback_DirectMediaConnection_connectStreams(this,
+ &MakeDirectConnections::done,
+ &MakeDirectConnections::failed));
+ }
+
+ void done()
+ {
+ mListener->succeeded();
+ }
+
+ void failed(const Ice::Exception&)
+ {
+ mListener->failed();
+ }
+private:
+ MediaConnectorBuilderPtr mMaterials;
+ DirectMediaConnectionPrx mDirectConnection;
+};
+
QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
const AsteriskSCF::SessionCommunications::V1::SessionPrx& sessionPrx,
const MediaConnectorIPtr& peer, const MediaSplicerIPtr& splicer)
@@ -931,6 +997,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
tasks.push_back(new GetCompatiblePairings(splicer, materials));
tasks.push_back(new MakeConnections(materials));
+ tasks.push_back(new MakeDirectConnections(materials));
return tasks;
}
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index f7f8508..e6bf5f8 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -23,6 +23,16 @@ using namespace AsteriskSCF::Replication::BridgeService::V1;
using namespace AsteriskSCF;
using namespace std;
+void UnplugMediaOperation::operator()(const SessionWrapperPtr& s)
+{
+ s->unplugMedia();
+}
+
+void SetupMediaOperation::operator()(const SessionWrapperPtr& s)
+{
+ s->setupMedia();
+}
+
ConnectSessionOperation::ConnectSessionOperation(const SessionPrx& exclude, const Logger& logger) :
mExclude(exclude->ice_getIdentity()),
mLogger(logger)
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index ce73388..a4e2c29 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -27,6 +27,22 @@ namespace AsteriskSCF
namespace BridgeService
{
+class UnplugMediaOperation : public std::unary_function<SessionWrapperPtr, void>
+{
+public:
+ void operator()(const SessionWrapperPtr& wrapper);
+
+private:
+};
+
+class SetupMediaOperation : public std::unary_function<SessionWrapperPtr, void>
+{
+public:
+ void operator()(const SessionWrapperPtr& wrapper);
+
+private:
+};
+
class ConnectSessionOperation : public std::unary_function<SessionWrapperPtr, void>
{
public:
commit 7ae703dcaf863f700b577ec6796e3ef81bbb0ebf
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Sep 4 13:08:41 2011 -0300
Add support for multiple streams using the SessionController interface.
diff --git a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
index 5a7892c..fb0b7ec 100644
--- a/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
+++ b/slice/AsteriskSCF/Replication/BridgeService/BridgeReplicatorIf.ice
@@ -97,7 +97,7 @@ class SessionPairing extends ReplicatedStateItem
{
string bridgeKey;
string sessionKey;
- AsteriskSCF::Media::V1::Session* mediaSession;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaPairingSeq incomingMediaPairings;
MediaPairingSeq outgoingMediaPairings;
};
diff --git a/src/BridgeImpl.cpp b/src/BridgeImpl.cpp
index a9cd9e1..a7b2feb 100755
--- a/src/BridgeImpl.cpp
+++ b/src/BridgeImpl.cpp
@@ -78,7 +78,6 @@ public:
// AsteriskSCF::SessionCommunications::Bridging::Bridge Interface
//
void addSessions_async(const AMD_Bridge_addSessionsPtr& callback, const SessionSeq& sessions, const Ice::Current&);
- void removeSessions(const SessionSeq& sessions, const Ice::Current& current);
void removeSessions_async(const AMD_Bridge_removeSessionsPtr& callback, const SessionSeq& sessions,
const Ice::Current&);
@@ -381,6 +380,140 @@ private:
SessionsTrackerPtr mTracker;
};
+class BridgeSessionController : public SessionController
+{
+public:
+ BridgeSessionController(const BridgeImplPtr& bridge,
+ const SessionWrapperPtr& self,
+ const Logger& logger) : mBridge(bridge), mSelf(self), mLogger(logger) { }
+
+ void changeStreamStates_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_changeStreamStatesPtr& cb,
+ const AsteriskSCF::Media::V1::StreamStateDict&, const Ice::Current&)
+ {
+ // We do not care about stream state changes at this point in time
+ cb->ice_response();
+ }
+
+ void addStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ {
+ AddStreamsOperationPtr op = new AddStreamsOperation(cb, mSelf, streams, mLogger);
+ mBridge->getSessions()->visitSessions(*op);
+ }
+
+ void removeStreams_async(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_removeStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams, const Ice::Current&)
+ {
+ RemoveStreamsOperation op(mSelf, streams);
+ mBridge->getSessions()->visitSessions(op);
+ cb->ice_response();
+ }
+
+private:
+ BridgeImplPtr mBridge;
+ SessionWrapperPtr mSelf;
+ Logger mLogger;
+};
+
+class SetAndGetSessionControllerTask : public QueuedTask
+{
+public:
+ SetAndGetSessionControllerTask(const Ice::ObjectAdapterPtr& adapter,
+ const SessionCollectionPtr& sessionCollection,
+ const SessionSeq& sessions,
+ const BridgeImplPtr& bridge,
+ const Logger& logger):
+ QueuedTask("SetAndGetSessionControllerTask"),
+ mAdapter(adapter),
+ mSessionManager(sessionCollection),
+ mSessions(sessions),
+ mTracker(new SessionsTracker()),
+ mBridge(bridge),
+ mLogger(logger)
+ {
+ }
+
+protected:
+ bool executeImpl()
+ {
+ bool tasksDispatched = false;
+ for (SessionSeq::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ {
+ SessionWrapperPtr session = mSessionManager->getSession(*i);
+ if (session == 0)
+ {
+ //
+ // This shouldn't happen!
+ //
+ mTracker->addExceptionMessage(*i, "session not found");
+ continue;
+ }
+ tasksDispatched = true;
+ SessionControllerPtr controller = new BridgeSessionController(mBridge, session, mLogger);
+ std::string identity = (*i)->ice_getIdentity().name;
+ identity += ".bridgecontroller";
+ SessionControllerPrx controllerPrx =
+ SessionControllerPrx::uncheckedCast(mAdapter->add(controller, mAdapter->getCommunicator()->stringToIdentity(identity)));
+ (*i)->begin_setAndGetSessionController(controllerPrx, newCallback_Session_setAndGetSessionController(this,
+ &SetAndGetSessionControllerTask::get, &SetAndGetSessionControllerTask::failed), session);
+ }
+ return !tasksDispatched;
+ }
+
+ void get(const SessionControllerPrx& controller, const SessionWrapperPtr& session)
+ {
+ session->setSessionController(controller);
+ mTracker->add(session->getSession());
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ mListener->succeeded();
+ }
+ }
+
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+ {
+ //
+ // TODO:
+ // * Log exception.
+ // * Rollback
+ // * Interpret exception and decide whether or not to fail the entire operations.
+ // Currently the semantics allow some operations to fail.
+ //
+ mTracker->addException(session->getSession(), ex);
+ try
+ {
+ //
+ // We want to make sure that session is laying around. The session collection will
+ // take care of cleaning it up as long as it is marked as destroyed.
+ //
+ session->destroy();
+ }
+ catch(...)
+ {
+ }
+ if (mTracker->responseCount() == mSessions.size())
+ {
+ SessionErrorSeq exceptions = mTracker->getExceptions();
+ if (exceptions.size() == mSessions.size())
+ {
+ mListener->failed();
+ }
+ else
+ {
+ mListener->succeeded();
+ }
+ }
+ }
+
+private:
+ Ice::ObjectAdapterPtr mAdapter;
+ SessionCollectionPtr mSessionManager;
+ SessionSeq mSessions;
+ SessionsTrackerPtr mTracker;
+ BridgeImplPtr mBridge;
+ Logger mLogger;
+};
+
class AddToListeners : public QueuedTask
{
public:
@@ -563,6 +696,7 @@ void BridgeImpl::addSessions_async(const AMD_Bridge_addSessionsPtr& callback, co
QueuedTasks tasks;
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_addSessionsPtr>(callback, tracker));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -827,6 +961,7 @@ void BridgeImpl::replaceSession_async(const AMD_Bridge_replaceSessionPtr& callba
tasks.push_back(new RemoveSessionsNotify(mListeners, removeTracker, cookies));
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, newSessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, cookies));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, newSessions, this, mLogger));
tasks.push_back(new GenericAMDCallback<AMD_Bridge_replaceSessionPtr>(callback, tracker));
tasks.push_back(new UpdateTask(this));
ExecutorPtr executor(new Executor(tasks, mLogger));
@@ -1037,6 +1172,7 @@ void BridgeImpl::getAddSessionsTasks(QueuedTasks& tasks,
SessionsTrackerPtr tracker(new SessionsTracker);
tasks.push_back(new SetBridgeTask(mSessions, mPrx, mSessionListenerPrx, sessions, tracker));
tasks.push_back(new AddToListeners(mListeners, tracker, getCookies()));
+ tasks.push_back(new SetAndGetSessionControllerTask(mObjAdapter, mSessions, sessions, this, mLogger));
tasks.push_back(new UpdateTask(this));
}
diff --git a/src/MediaSplicer.cpp b/src/MediaSplicer.cpp
index e4992fb..a6a0a1e 100755
--- a/src/MediaSplicer.cpp
+++ b/src/MediaSplicer.cpp
@@ -68,13 +68,12 @@ typedef vector<IncomingPairing> IncomingPairings;
struct MediaConnectorBuilder : public IceUtil::Shared
{
AsteriskSCF::SessionCommunications::V1::SessionPrx sessionPrx;
- AsteriskSCF::Media::V1::SessionPrx mediaSession;
- AsteriskSCF::Media::V1::StreamSourceSeq sources;
- AsteriskSCF::Media::V1::StreamSinkSeq sinks;
+ AsteriskSCF::Media::V1::StreamInformationDict streams;
MediaConnectorIPtr peer;
MediaConnectorIPtr connector;
OutgoingPairings outgoingPairings;
IncomingPairings incomingPairings;
+ std::map<std::string, std::string> connections;
};
typedef IceUtil::Handle<MediaConnectorBuilder> MediaConnectorBuilderPtr;
@@ -91,9 +90,9 @@ public:
const std::string& sessionId,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(data->mediaSession),
mOutgoing(data->outgoingPairings),
mIncoming(data->incomingPairings),
+ mStreams(data->streams),
mPeer(data->peer),
mConnected(true),
mBridgeId(bridgeId),
@@ -111,7 +110,7 @@ public:
const MediaConnectorPtr& peer,
const ReplicatorSmartPrx& replicator,
const Logger& logger) :
- mMedia(pairing->mediaSession),
+ mStreams(pairing->streams),
mPeer(peer),
mConnected(true),
mBridgeId(pairing->bridgeKey),
@@ -165,7 +164,7 @@ public:
}
void update(const MediaConnectorPtr& peer, const OutgoingPairings& outgoing,
- const IncomingPairings& incoming)
+ const IncomingPairings& incoming, const std::map<std::string, std::string>& newConnections)
{
mLogger(Debug) << FUNLOG << ": establishing a new peer connection.";
SessionPairingPtr currentState;
@@ -174,6 +173,7 @@ public:
mPeer = peer;
mOutgoing = outgoing;
mIncoming = incoming;
+ mConnections = newConnections;
currentState = createUpdate();
}
pushUpdate(currentState);
@@ -184,7 +184,7 @@ public:
if (mReplicator)
{
SessionPairingPtr currentState(new SessionPairing);
- currentState->mediaSession = mMedia;
+ currentState->streams = mStreams;
currentState->bridgeKey = mBridgeId;
currentState->sessionKey = mSessionId;
currentState->key = mKey;
@@ -230,9 +230,13 @@ public:
void destroy()
{
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -247,37 +251,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
}
@@ -301,9 +300,13 @@ public:
{
mLogger(Debug) << FUNLOG << ": unplugging sinks and sources.";
SessionPairingPtr newState;
+ vector<OutgoingPairing> outgoing;
+ vector<IncomingPairing> incoming;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
+ outgoing = mOutgoing;
mOutgoing.clear();
+ incoming = mIncoming;
mIncoming.clear();
mConnected = false;
newState = createUpdate();
@@ -318,37 +321,32 @@ public:
// it's media session constituents to stop communicating with anything
// else. This also used to try one-ways, that's probably a bad idea too.
//
- try
+ for (vector<OutgoingPairing>::const_iterator pairing = outgoing.begin();
+ pairing != outgoing.end();
+ ++pairing)
{
- StreamSourceSeq sources = mMedia->getSources();
- for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
+ try
{
- try
- {
-// (*i)->setSink(StreamSinkPrx());
-// TODO: Call removeSink
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ pairing->first->setSource(StreamSourcePrx());
}
- StreamSinkSeq sinks = mMedia->getSinks();
- for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ catch (const Ice::Exception& ex)
{
- try
- {
- (*i)->setSource(StreamSourcePrx());
- }
- catch (const Ice::Exception& ex)
- {
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
- }
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
}
}
- catch (const Ice::Exception& ex)
+
+ for (vector<IncomingPairing>::const_iterator pairing = incoming.begin();
+ pairing != incoming.end();
+ ++pairing)
{
- mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ try
+ {
+ pairing->first->removeSink(pairing->second);
+ }
+ catch (const Ice::Exception& ex)
+ {
+ mLogger(Debug) << FUNLOG << ":" << __LINE__ << ": exception, thought you would like to know " << ex.what();
+ }
}
return true;
@@ -359,11 +357,21 @@ public:
return mKey;
}
+ StreamInformationDict streams()
+ {
+ return mStreams;
+ }
+
+ std::map<std::string, std::string> connections()
+ {
+ return mConnections;
+ }
+
private:
boost::shared_mutex mLock;
- AsteriskSCF::Media::V1::SessionPrx mMedia;
vector<OutgoingPairing> mOutgoing;
vector<IncomingPairing> mIncoming;
+ StreamInformationDict mStreams;
MediaConnectorPtr mPeer;
bool mConnected;
string mBridgeId;
@@ -372,6 +380,7 @@ private:
Ice::Long mRepCounter;
ReplicatorSmartPrx mReplicator;
Logger mLogger;
+ std::map<std::string, std::string> mConnections;
};
QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
@@ -404,11 +413,11 @@ public:
//
if (data->peer)
{
- data->peer->update(connector, data->outgoingPairings, data->incomingPairings);
+ data->peer->update(connector, data->outgoingPairings, data->incomingPairings, data->connections);
}
connector->initialUpdate();
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mSessions.push_back(MediaSessionStruct(data->mediaSession, connector));
+ mConnectors.push_back(connector);
return connector;
}
@@ -422,13 +431,13 @@ public:
//
// Reap.
//
- MediaSessions::iterator i = mSessions.begin();
- while (i != mSessions.end())
+ MediaConnectors::iterator i = mConnectors.begin();
+ while (i != mConnectors.end())
{
- if (!i->connector->isConnected())
+ if (!(*i)->isConnected())
{
mLogger(Debug) << FUNLOG << ": reaping a connector";
- i = mSessions.erase(i);
+ i = mConnectors.erase(i);
continue;
}
++i;
@@ -439,9 +448,9 @@ public:
// one of them is going to have an empty connector.
//
MediaConnectorIPtr existing;
- if (mSessions.size() == 1)
+ if (mConnectors.size() == 1)
{
- existing = mSessions.back().connector;
+ existing = mConnectors.back();
existing->clearConnections();
}
@@ -459,126 +468,112 @@ public:
boost::unique_lock<boost::shared_mutex> lock(mLock);
MediaConnectorIPtr result;
- MediaSessions::iterator i = mSessions.begin();
- while (i != mSessions.end())
+ MediaConnectors::iterator i = mConnectors.begin();
+ while (i != mConnectors.end())
{
- if (!i->connector->isConnected())
+ if (!(*i)->isConnected())
{
mLogger(Debug) << FUNLOG << ": reaping a connector";
- MediaSessions::iterator t = i;
- i = mSessions.erase(t);
+ MediaConnectors::iterator t = i;
+ i = mConnectors.erase(t);
continue;
}
++i;
}
MediaConnectorPtr existing;
- if (mSessions.size() == 1)
+ if (mConnectors.size() == 1)
{
- existing = mSessions.back().connector;
+ existing = mConnectors.back();
existing->clearConnections();
}
result = new MediaConnectorI(pairings, existing, mReplicator, mLogger);
- mSessions.push_back(MediaSessionStruct(pairings->mediaSession, result));
+ mConnectors.push_back(result);
return result;
}
- vector<OutgoingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSinkSeq& sinks)
+ void findCompatiblePairings(const StreamInformationDict& streams, vector<OutgoingPairing>& outgoing,
+ vector<IncomingPairing>& incoming,
+ MediaConnectorIPtr connector,
+ std::map<std::string, std::string>& connections)
{
- vector<StreamSourceSeq> allSources;
-
+ // If no streams are present we can not establish any pairings
+ if (streams.empty())
{
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
- {
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
- {
- continue;
- }
- allSources.push_back(i->mediaSession->getSources());
- }
+ return;
}
- vector<OutgoingPairing> result;
- for (StreamSinkSeq::const_iterator sink = sinks.begin(); sink != sinks.end(); ++sink)
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+
+ for (MediaConnectors::const_iterator i = mConnectors.begin(); i != mConnectors.end(); ++i)
{
- bool connected = false;
- for (vector<StreamSourceSeq>::iterator sources = allSources.begin(); sources != allSources.end() && !connected;
- ++sources)
+ if ((*i)->id() == connector->id())
{
- for (StreamSourceSeq::iterator source = sources->begin(); source != sources->end(); ++source)
- {
- if (canAdapt(*sink, *source))
- {
- result.push_back(OutgoingPairing(*sink, *source));
- sources->erase(source);
- connected = true;
- break;
- }
- }
+ continue;
}
- if (!connected)
+
+ StreamInformationDict theirStreams = (*i)->streams();
+
+ // If no streams are present go ahead and skip it
+ if (theirStreams.empty())
{
- //
- // TODO: We couldn't establish a proper connection for this sink. What happens
- // here is probably a policy thing.
- //
+ continue;
}
- }
- return result;
- }
- vector<IncomingPairing> findCompatiblePairings(const AsteriskSCF::Media::V1::SessionPrx& sessionToBeConnected, const StreamSourceSeq& sources)
- {
- vector<StreamSinkSeq> allSinks;
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- //
- // TODO: This is not really correct at all. When the bridge implements conferencing chances, everything
- // will come into a mix and go out.
- //
- for (MediaSessions::iterator i = mSessions.begin(); i != mSessions.end(); ++i)
+ // Iterate our own streams looking for any that can be connected to this session
+ for (StreamInformationDict::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
{
- if (i->mediaSession->ice_getIdentity() == sessionToBeConnected->ice_getIdentity())
+ if (stream->second->state == Removed)
{
continue;
}
- allSinks.push_back(i->mediaSession->getSinks());
- }
- }
- vector<IncomingPairing> result;
- for (StreamSourceSeq::const_iterator source = sources.begin(); source != sources.end(); ++source)
- {
- bool connected = false;
- for (vector<StreamSinkSeq>::iterator sinks = allSinks.begin(); sinks != allSinks.end() && !connected; ++sinks)
- {
- for (StreamSinkSeq::iterator sink = sinks->begin(); sink != sinks->end(); ++sink)
+ for (StreamInformationDict::const_iterator theirStream = theirStreams.begin();
+ theirStream != theirStreams.end();
+ ++theirStream)
{
- if (canAdapt(*source, *sink))
+ if (theirStream->second->state == Removed)
{
- result.push_back(IncomingPairing(*source, *sink));
- sink = sinks->erase(sink);
- connected = true;
- break;
+ continue;
}
+
+ // Is this stream compatible with the other stream?
+ if (haveCommonFormats(stream->second, theirStream->second) == false)
+ {
+ continue;
+ }
+
+ // Connect all available sources and sinks together
+ StreamSinkSeq sinks = stream->second->sinks;
+ StreamSourceSeq sources = stream->second->sources;
+
+ while (!sinks.empty() && !theirStream->second->sources.empty())
+ {
+ outgoing.push_back(OutgoingPairing(sinks.back(), theirStream->second->sources.back()));
+ sinks.pop_back();
+ theirStream->second->sources.pop_back();
+ }
+
+ while (!sources.empty() && !theirStream->second->sinks.empty())
+ {
+ incoming.push_back(IncomingPairing(sources.back(), theirStream->second->sinks.back()));
+ sources.pop_back();
+ theirStream->second->sinks.pop_back();
+ }
+
+ // Record this stream connection
+ connections.insert(make_pair(theirStream->first, stream->first));
+
+ // This stream has been connected so no longer consider it
+ theirStreams.erase(theirStream->first);
+ break;
}
}
- if (!connected)
- {
- //
- // TODO: We couldn't establish a proper connection for this source! What happens now
- // should be a matter of policy.
- //
- }
}
- return result;
}
-
+
private:
boost::shared_mutex mLock;
@@ -587,30 +582,9 @@ private:
ReplicatorSmartPrx mReplicator;
Logger mLogger;
- struct MediaSessionStruct
- {
- SessionPrx mediaSession;
- MediaConnectorIPtr connector;
-
- MediaSessionStruct(const SessionPrx& session, const MediaConnectorIPtr& con) :
- mediaSession(session),
- connector(con)
- {
- }
- };
-
- typedef vector<MediaSessionStruct> MediaSessions;
+ typedef vector<MediaConnectorIPtr> MediaConnectors;
- MediaSessions mSessions;
-
- bool needMixing()
- {
- //
- // TODO: We are only supporting single stream with two endpoints for the moment, so no mixing will
- // be required.
- //
- return false;
- }
+ MediaConnectors mConnectors;
bool hasPreferredCodecOverride()
{
@@ -622,93 +596,50 @@ private:
return false;
}
- bool canAdapt(const Media::V1::StreamSourcePrx&, const Media::V1::StreamSinkPrx&)
- {
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-
- bool canAdapt(const Media::V1::StreamSinkPrx&, const Media::V1::StreamSourcePrx&)
+ bool canAdapt(const StreamInformationPtr&, const StreamInformationPtr&)
{
- //
- // For the moment we are assuming two streams that can definitely talk to each other.
- //
- return true;
- }
-};
-
-class GetMediaSession : public QueuedTask
-{
-public:
- GetMediaSession(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetMediaSession"),
- mSession(session),
- mMaterials(materials)
- {
- }
-
-protected:
- bool executeImpl()
- {
- mMaterials->sessionPrx->begin_getMediaSession(
- AsteriskSCF::SessionCommunications::V1::newCallback_Session_getMediaSession(this,
- &GetMediaSession::done, &GetMediaSession::failed));
+ // For the moment we can not transcode
return false;
}
- void done(const AsteriskSCF::Media::V1::SessionPrx& mediaSession)
+ bool haveCommonFormats(const StreamInformationPtr& stream0, const StreamInformationPtr& stream1)
{
- mMaterials->mediaSession = mediaSession;
- mListener->succeeded();
- }
+ for (FormatSeq::const_iterator format0 = stream0->formats.begin();
+ format0 != stream0->formats.end();
+ ++format0)
+ {
+ for (FormatSeq::const_iterator format1 = stream1->formats.begin();
+ format1 != stream1->formats.end();
+ ++format1)
+ {
+ // Is the name alike?
+ if ((*format0)->name != (*format1)->name)
+ {
+ continue;
+ }
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- SessionWrapperPtr mSession;
- MediaConnectorBuilderPtr mMaterials;
-};
+ // Is a format operations service available, and if so are they compatible?
+ if ((*format0)->operations &&
+ (((*format0)->operations != (*format1)->operations ||
+ ((*format0)->operations->checkCompatible((*format0), (*format1)) == false))))
+ {
+ continue;
+ }
-class GetSinks : public QueuedTask
-{
-public:
- GetSinks(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSinks"),
- mMaterials(materials)
- {
- }
+ return true;
+ }
+ }
-protected:
- bool executeImpl()
- {
- mMaterials->mediaSession->begin_getSinks(newCallback_Session_getSinks(this,
- &GetSinks::done, &GetSinks::failed));
return false;
}
-
- void done(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
- {
- mMaterials->sinks = sinks;
- mListener->succeeded();
- }
-
- void failed(const Ice::Exception&)
- {
- mListener->failed();
- }
-private:
- MediaConnectorBuilderPtr mMaterials;
};
-class GetSources : public QueuedTask
+class GetStreams : public QueuedTask
{
public:
- GetSources(const MediaConnectorBuilderPtr& materials) :
- QueuedTask("GetSources"),
+ GetStreams(const SessionWrapperPtr& session, const MediaConnectorBuilderPtr& materials) :
+ QueuedTask("GetStreams"),
+ mSession(session),
mMaterials(materials)
{
}
@@ -716,14 +647,15 @@ public:
protected:
bool executeImpl()
{
- mMaterials->mediaSession->begin_getSources(newCallback_Session_getSources(this,
- &GetSources::done, &GetSources::failed));
+ mMaterials->sessionPrx->begin_getStreams(
+ AsteriskSCF::SessionCommunications::V1::newCallback_Session_getStreams(this,
+ &GetStreams::done, &GetStreams::failed));
return false;
}
- void done(const AsteriskSCF::Media::V1::StreamSourceSeq& sources)
+ void done(const AsteriskSCF::Media::V1::StreamInformationDict& streams)
{
- mMaterials->sources = sources;
+ mMaterials->streams = streams;
mListener->succeeded();
}
@@ -731,8 +663,8 @@ protected:
{
mListener->failed();
}
-
private:
+ SessionWrapperPtr mSession;
MediaConnectorBuilderPtr mMaterials;
};
@@ -749,8 +681,8 @@ public:
protected:
bool executeImpl()
{
- mMaterials->incomingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sources);
- mMaterials->outgoingPairings = mSplicer->findCompatiblePairings(mMaterials->mediaSession, mMaterials->sinks);
+ mSplicer->findCompatiblePairings(mMaterials->streams, mMaterials->outgoingPairings, mMaterials->incomingPairings,
+ mMaterials->connector, mMaterials->connections);
return true;
}
@@ -897,7 +829,7 @@ protected:
{
if (mMaterials->connector)
{
- mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+ mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
}
mListener->succeeded();
}
@@ -919,7 +851,7 @@ protected:
{
if (mMaterials->connector)
{
- mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings);
+ mMaterials->connector->update(mMaterials->peer, mMaterials->outgoingPairings, mMaterials->incomingPairings, mMaterials->connections);
}
mListener->succeeded();
}
@@ -995,9 +927,7 @@ QueuedTasks createMediaConnectTasks(const SessionWrapperPtr& session,
MediaConnectorBuilderPtr materials(new MediaConnectorBuilder);
materials->peer = peer;
materials->sessionPrx = sessionPrx;
- tasks.push_back(new GetMediaSession(session, materials));
- tasks.push_back(new GetSinks(materials));
- tasks.push_back(new GetSources(materials));
+ tasks.push_back(new GetStreams(session, materials));
tasks.push_back(new CreateAndRegisterConnector(session, splicer, materials));
tasks.push_back(new GetCompatiblePairings(splicer, materials));
tasks.push_back(new MakeConnections(materials));
diff --git a/src/MediaSplicer.h b/src/MediaSplicer.h
index e00b2a9..31b3f19 100644
--- a/src/MediaSplicer.h
+++ b/src/MediaSplicer.h
@@ -72,6 +72,11 @@ public:
* Replication support function.
**/
virtual void update(const AsteriskSCF::Replication::BridgeService::V1::SessionPairingPtr& update) = 0;
+
+ /**
+ * Return connection information.
+ */
+ virtual std::map<std::string, std::string> connections() = 0;
};
typedef IceUtil::Handle<MediaConnector> MediaConnectorPtr;
diff --git a/src/SessionOperations.cpp b/src/SessionOperations.cpp
index 58670a9..f7f8508 100644
--- a/src/SessionOperations.cpp
+++ b/src/SessionOperations.cpp
@@ -100,3 +100,126 @@ void RelayIndication::operator()(const SessionWrapperPtr& session)
}
}
}
+
+AddStreamsOperation::AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const Logger& logger) :
+ mCb(cb),
+ mSelf(self),
+ mStreams(streams),
+ mIgnore(false),
+ mLogger(logger)
+{
+}
+
+void AddStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (mIgnore == true || mSelf == session)
+ {
+ return;
+ }
+
+ SessionControllerPrx controller = session->getSessionController();
+
+ if (!controller)
+ {
+ return;
+ }
+
+ // Go ahead and request that the streams be added
+ controller->begin_addStreams(mStreams, newCallback_SessionController_addStreams(this,
+ &AddStreamsOperation::added,
+ &AddStreamsOperation::failed),
+ session);
+
+ // We only allow requesting streams on a single session at this point
+ mIgnore = true;
+}
+
+void AddStreamsOperation::added(const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const SessionWrapperPtr& session)
+{
+ // If any streams were added set up media so it flows
+ if (!streams.empty())
+ {
+ session->unplugMedia();
+ mSelf->unplugMedia();
+ session->setupMedia();
+ mSelf->setupMedia();
+ }
+
+ // Now that everything is ready go ahead and respond
+ mCb->ice_response(streams);
+}
+
+void AddStreamsOperation::failed(const Ice::Exception& ex, const SessionWrapperPtr& session)
+{
+ // Since this failed tell the session that initiated this operation that no streams were added
+ mCb->ice_response(AsteriskSCF::Media::V1::StreamInformationDict());
+
+ mLogger(Debug) << FUNLOG << ": (" << session->getSession() << ") " << ex.what();
+}
+
+RemoveStreamsOperation::RemoveStreamsOperation(const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams) :
+ mSelf(self),
+ mStreams(streams),
+ mIgnore(false)
+{
+}
+
+void RemoveStreamsOperation::operator()(const SessionWrapperPtr& session)
+{
+ if (mIgnore == true || mSelf == session)
+ {
+ return;
+ }
+
+ AsteriskSCF::Media::V1::StreamInformationDict remove;
+ std::map<std::string, std::string> connections = session->connections();
+
+ // Determine the streams we actually need to remove
+ for (AsteriskSCF::Media::V1::StreamInformationDict::const_iterator stream = mStreams.begin();
+ stream != mStreams.end();
+ ++stream)
+ {
+ std::map<std::string, std::string>::const_iterator connection = connections.find(stream->first);
+
+ if (connection != connections.end())
+ {
+ remove.insert(make_pair(connection->second, new AsteriskSCF::Media::V1::StreamInformation()));
+ }
+ }
+
+ if (remove.empty())
+ {
+ return;
+ }
+
+ SessionControllerPrx controller = session->getSessionController();
+
+ if (!controller)
+ {
+ return;
+ }
+
+ controller->begin_removeStreams(remove, newCallback_SessionController_removeStreams(this,
+ &RemoveStreamsOperation::removed,
+ &RemoveStreamsOperation::failed),
+ session);
+
+ mIgnore = true;
+}
+
+void RemoveStreamsOperation::removed(const SessionWrapperPtr& session)
+{
+ session->unplugMedia();
+ mSelf->unplugMedia();
+ session->setupMedia();
+ mSelf->setupMedia();
+}
+
+void RemoveStreamsOperation::failed(const Ice::Exception&, const SessionWrapperPtr&)
+{
+}
diff --git a/src/SessionOperations.h b/src/SessionOperations.h
index 97726fc..ce73388 100644
--- a/src/SessionOperations.h
+++ b/src/SessionOperations.h
@@ -16,6 +16,7 @@
#pragma once
#include <AsteriskSCF/SessionCommunications/SessionCommunicationsIf.h>
+#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/logger.h>
#include "BridgeReplicatorIf.h"
#include "SessionWrapper.h"
@@ -77,6 +78,46 @@ private:
bool mIncludeConnected;
};
+class AddStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ AddStreamsOperation(const AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr& cb,
+ const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams,
+ const AsteriskSCF::System::Logging::Logger& logger);
+
+ void operator()(const SessionWrapperPtr& session);
+
+ void added(const AsteriskSCF::Media::V1::StreamInformationDict& streams, const SessionWrapperPtr& session);
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+ AsteriskSCF::SessionCommunications::V1::AMD_SessionController_addStreamsPtr mCb;
+ SessionWrapperPtr mSelf;
+ AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+ bool mIgnore;
+ AsteriskSCF::System::Logging::Logger mLogger;
+};
+
+typedef IceUtil::Handle<AddStreamsOperation> AddStreamsOperationPtr;
+
+class RemoveStreamsOperation : public std::unary_function<SessionWrapperPtr, void>, public Ice::Object
+{
+public:
+ RemoveStreamsOperation(const SessionWrapperPtr& self,
+ const AsteriskSCF::Media::V1::StreamInformationDict& streams);
+
+ void operator()(const SessionWrapperPtr& session);
+
+ void removed(const SessionWrapperPtr& session);
+ void failed(const Ice::Exception& ex, const SessionWrapperPtr& session);
+
+private:
+ SessionWrapperPtr mSelf;
+ AsteriskSCF::Media::V1::StreamInformationDict mStreams;
+ bool mIgnore;
+};
+
class IfStateCriteria
{
public:
diff --git a/src/SessionWrapper.cpp b/src/SessionWrapper.cpp
index ea9f8fa..1178f5e 100644
--- a/src/SessionWrapper.cpp
+++ b/src/SessionWrapper.cpp
@@ -467,6 +467,19 @@ SessionPrx SessionWrapper::getSession() const
return mSession->session;
}
+SessionControllerPrx SessionWrapper::getSessionController() const
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mSessionController;
+}
+
+void SessionWrapper::setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx& controller)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mSessionController = controller;
+}
+
+
void SessionWrapper::setupMedia()
{
mLogger(Debug) << FUNLOG << " for " << mId;
@@ -566,6 +579,16 @@ string SessionWrapper::id()
return mId;
}
+std::map<std::string, std::string> SessionWrapper::connections()
+{
+ if (mConnector)
+ {
+ return mConnector->connections();
+ }
+
+ return std::map<std::string, std::string>();
+}
+
void SessionWrapper::setup()
{
mLogger(Debug) << FUNLOG << ": starting setup of connected session " << mId;
diff --git a/src/SessionWrapper.h b/src/SessionWrapper.h
index 9ddb43c..0bdef83 100644
--- a/src/SessionWrapper.h
+++ b/src/SessionWrapper.h
@@ -81,6 +81,16 @@ public:
AsteriskSCF::SessionCommunications::V1::SessionPrx getSession() const;
/**
+ * Accesses the session controller proxy that the session gave us.
+ */
+ AsteriskSCF::SessionCommunications::V1::SessionControllerPrx getSessionController() const;
+
+ /**
+ * Sets the session controller used to control this session.
+ */
+ void setSessionController(const AsteriskSCF::SessionCommunications::V1::SessionControllerPrx&);
+
+ /**
* Sets the media connector object. This updates the media pairings associated
* with this session.
* Initiates replication.
@@ -114,6 +124,8 @@ public:
std::string id();
+ std::map<std::string, std::string> connections();
+
//
// Large scale macro operations. In effect the wrapper becomes a sub-component within the bridge.
//
@@ -144,6 +156,7 @@ private:
std::string mId;
MediaSplicerPtr mSplicer;
ExecutorPtr mActivities;
+ AsteriskSCF::SessionCommunications::V1::SessionControllerPrx mSessionController;
/**
* Sends changes to the replication service. This should never occur
-----------------------------------------------------------------------
--
asterisk-scf/integration/bridging.git
More information about the asterisk-scf-commits
mailing list