[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "directmedia" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Sat Jul 16 16:59:28 CDT 2011
branch "directmedia" has been created
at a274eda1bfb32791b092950b644dfc319edcfac8 (commit)
- Log -----------------------------------------------------------------
commit a274eda1bfb32791b092950b644dfc319edcfac8
Author: Joshua Colp <jcolp at digium.com>
Date: Sat Jul 16 19:00:17 2011 -0300
Add direct media connection support. This allows media to be directly connected external to Asterisk SCF.
diff --git a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
index db8dd79..be776f3 100644
--- a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
@@ -143,7 +143,7 @@ module V1
sequence<AsteriskSCF::SessionCommunications::V1::SessionListener*> SessionListenerSeq;
- sequence<AsteriskSCF::Media::RTP::V1::RTPSession*> RTPMediaSessionSeq;
+ dictionary<string, AsteriskSCF::Media::RTP::V1::RTPSession*> RTPMediaSessionDict;
class SipSessionStateItem extends SipStateItem
{
@@ -156,7 +156,7 @@ module V1
AsteriskSCF::Media::V1::StreamSourceSeq mSources;
AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
AsteriskSCF::Media::V1::StreamInformationDict mStreams;
- RTPMediaSessionSeq mRTPMediaSessions;
+ RTPMediaSessionDict mRTPMediaSessions;
SessionListenerSeq mListeners;
AsteriskSCF::SessionCommunications::V1::Bridge *mBridge;
AsteriskSCF::SessionCommunications::V1::SessionCookieDict mCookies;
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 6f0dbba..635dd8b 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -223,11 +223,11 @@ void PJSipSessionModule::replicateState(PJSipDialogModInfo *dlgInfo, PJSipTransa
lg(Debug) << "Session object identity: " << sessionInfo->mSessionState->mSessionObjectId.name;
lg(Debug) << "Media session object identity: " << sessionInfo->mSessionState->mMediaSessionObjectId.name;
- for (RTPMediaSessionSeq::const_iterator mediaSession = sessionInfo->mSessionState->mRTPMediaSessions.begin();
+ for (RTPMediaSessionDict::const_iterator mediaSession = sessionInfo->mSessionState->mRTPMediaSessions.begin();
mediaSession != sessionInfo->mSessionState->mRTPMediaSessions.end();
++mediaSession)
{
- lg(Debug) << "Media session: " << (*mediaSession);
+ lg(Debug) << "Media session: " << mediaSession->second;
}
lg(Debug) << "Bridge: " << sessionInfo->mSessionState->mBridge;
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index 67f753f..6dc9d62 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -345,7 +345,7 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination,
const Ice::Identity& sessionid, const Ice::Identity& controllerid,
const Ice::Identity& mediaid,
- const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+ const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict& mediasessions,
const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
{
diff --git a/src/SipEndpoint.h b/src/SipEndpoint.h
index b92127f..3e4a9a2 100644
--- a/src/SipEndpoint.h
+++ b/src/SipEndpoint.h
@@ -240,7 +240,7 @@ public:
//
AsteriskSCF::SipSessionManager::SipSessionPtr createSession(const std::string&);
AsteriskSCF::SipSessionManager::SipSessionPtr createSession(const std::string&, const Ice::Identity&, const Ice::Identity&,
- const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq&,
+ const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict&,
const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&);
void removeSession(const AsteriskSCF::SessionCommunications::V1::SessionPtr&);
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 642b191..54fc1a4 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -221,9 +221,9 @@ public:
pjsip_inv_session *mInviteSession;
/**
- * A vector of RTP media sessions belonging to this endpoint.
+ * A dictionary of streams and their respective RTP session.
*/
- RTPMediaSessionSeq mRTPSessions;
+ RTPMediaSessionDict mRTPSessions;
/**
* A vector of media sources associated with this endpoint.
@@ -555,6 +555,181 @@ private:
SipSessionPtr mSession;
};
+class CheckDirectConnectionsOperation : public SuspendableWork
+{
+public:
+ CheckDirectConnectionsOperation(
+ const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr& cb,
+ const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
+ const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+ : mCb(cb), mConnections(connections), mImplPriv(sessionPriv) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ Ice::StringSeq allowedStreams;
+
+ lg(Debug) << "Executing a checkDirectConnections Operation";
+
+ // Iterate through each requested connection checking if it is possible
+ for (AsteriskSCF::Media::V1::DirectMediaConnectionDict::const_iterator connection = mConnections.begin();
+ connection != mConnections.end();
+ ++connection)
+ {
+ // Do we have a local stream that matches this?
+ StreamInformationDict::const_iterator stream = mImplPriv->mStreams.find(connection->first);
+
+ // Uh, if they passed in a stream that we know nothing about skip it...
+ if (stream == mImplPriv->mStreams.end())
+ {
+ continue;
+ }
+
+ StreamSinkRTPPrx sink = StreamSinkRTPPrx::checkedCast(connection->second);
+
+ // If the provided sink is not an RTP one this connection is not possible
+ if (!sink)
+ {
+ continue;
+ }
+
+ std::string their_address = sink->getRemoteAddress();
+
+ StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(stream->second->sources.front());
+ std::string our_address = source->getLocalAddress();
+
+ // Is the address compatible with the address family on it?
+ if ((our_address.find(":") == std::string::npos && their_address.find(":") != std::string::npos) ||
+ (our_address.find(":") != std::string::npos && their_address.find(":") == std::string::npos))
+ {
+ continue;
+ }
+
+ // It passed! We can establish a direct external media connection
+ allowedStreams.push_back(connection->first);
+ }
+
+ mCb->ice_response(allowedStreams);
+
+ return Complete;
+ }
+
+private:
+ AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr mCb;
+ AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
+ boost::shared_ptr<SipSessionPriv> mImplPriv;
+};
+
+
+class ConnectStreamsOperation : public SuspendableWork
+{
+public:
+ ConnectStreamsOperation(
+ const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
+ const SipSessionPtr& session)
+ : mCb(cb), mConnections(connections), mSession(session) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ pjmedia_sdp_session *sdp = mSession->modifySDP(mConnections);
+
+ if (!mSession->getInviteSession()->invite_tsx)
+ {
+ pjsip_tx_data *packet = NULL;
+
+ if ((pjsip_inv_reinvite(mSession->getInviteSession(), NULL, sdp, &packet)) == PJ_SUCCESS)
+ {
+ pjsip_inv_send_msg(mSession->getInviteSession(), packet);
+ }
+ }
+ else
+ {
+ pjsip_inv_set_sdp_answer(mSession->getInviteSession(), sdp);
+ }
+
+ mCb->ice_response();
+
+ return Complete;
+ }
+
+private:
+ AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr mCb;
+ AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
+ const SipSessionPtr mSession;
+};
+
+class DisconnectStreamsOperation : public SuspendableWork
+{
+public:
+ DisconnectStreamsOperation(
+ const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+ const Ice::StringSeq& streams,
+ const SipSessionPtr& session)
+ : mCb(cb), mStreams(streams), mSession(session) { }
+
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
+ pjsip_tx_data *packet = NULL;
+
+ if ((pjsip_inv_reinvite(mSession->getInviteSession(), NULL, sdp, &packet)) == PJ_SUCCESS)
+ {
+ pjsip_inv_send_msg(mSession->getInviteSession(), packet);
+ }
+
+ mCb->ice_response();
+
+ return Complete;
+ }
+
+private:
+ AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr mCb;
+ Ice::StringSeq mStreams;
+ const SipSessionPtr mSession;
+};
+
+/**
+ * Implementation of a DirectMediaConnection interface for the SipSession.
+ */
+class SipDirectMediaConnection : public AsteriskSCF::Media::V1::DirectMediaConnection
+{
+public:
+ SipDirectMediaConnection(boost::shared_ptr<SipSessionPriv> implPriv, const SipSessionPtr& session) :
+ mImplPriv(implPriv), mSession(session) { }
+
+ void checkDirectConnections_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr& cb,
+ const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
+ {
+ lg(Debug) << "Queueing checkDirectConnections operation";
+ mSession->enqueueSessionWork(new CheckDirectConnectionsOperation(cb, connections, mImplPriv));
+ }
+
+ void connectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+ const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
+ {
+ lg(Debug) << "Queueing connectStreams operation";
+ mSession->enqueueSessionWork(new ConnectStreamsOperation(cb, connections, mSession));
+ }
+
+ void disconnectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+ const Ice::StringSeq& streams, const Ice::Current&)
+ {
+ lg(Debug) << "Queueing disconnectStreams operation";
+ mSession->enqueueSessionWork(new DisconnectStreamsOperation(cb, streams, mSession));
+ }
+
+private:
+ /**
+ * Private implementation details for SipSession.
+ */
+ boost::shared_ptr<SipSessionPriv> mImplPriv;
+
+ /**
+ * A pointer to the communications session that created us.
+ */
+ SipSessionPtr mSession;
+};
+
void SipSession::initializePJSIPStructs()
{
pj_str_t local_uri, remote_uri;
@@ -642,6 +817,9 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
mImplPriv->mOurSessionControllerProxy =
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->addWithUUID(mImplPriv->mOurSessionController));
+ DirectMediaConnectionPtr directMedia = new SipDirectMediaConnection(mImplPriv, this);
+ adapter->addFacet(directMedia, mImplPriv->mSessionProxy->ice_getIdentity(), directMediaConnectionFacet);
+
if (isUAC)
{
lg(Debug) << "New session is UAC, so we're creating the necessary PJSIP structures";
@@ -655,7 +833,7 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
const std::string& destination, const Ice::Identity& sessionid,
const Ice::Identity& controllerid,
- const Ice::Identity& mediaid, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+ const Ice::Identity& mediaid, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict& mediasessions,
const AsteriskSCF::Media::V1::StreamSourceSeq& sources, const AsteriskSCF::Media::V1::StreamSinkSeq& sinks,
PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool isUAC)
@@ -672,6 +850,9 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
mImplPriv->mOurSessionControllerProxy =
AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->add(mImplPriv->mOurSessionController, controllerid));
+ DirectMediaConnectionPtr directMedia = new SipDirectMediaConnection(mImplPriv, this);
+ adapter->addFacet(directMedia, sessionid, directMediaConnectionFacet);
+
mImplPriv->mRTPSessions = mediasessions;
mImplPriv->mSources = sources;
mImplPriv->mSinks = sinks;
@@ -1431,12 +1612,12 @@ public:
{
//XXX This loop may be a candidate for making AMI-ified and returning "Suspended"
// Release all the RTP sessions we are using
- for (RTPMediaSessionSeq::const_iterator i = mSessionPriv->mRTPSessions.begin();
+ for (RTPMediaSessionDict::const_iterator i = mSessionPriv->mRTPSessions.begin();
i != mSessionPriv->mRTPSessions.end(); ++i)
{
try
{
- (*i)->release();
+ i->second->release();
}
catch (const Ice::Exception& ex)
{
@@ -1634,7 +1815,7 @@ pjmedia_sdp_session *SipSession::createSDPOffer(const AsteriskSCF::Media::V1::St
stream->second->sources.push_back(source);
// Update the SIP session with some RTP session details
- mImplPriv->mRTPSessions.push_back(session);
+ mImplPriv->mRTPSessions.insert(make_pair(boost::lexical_cast<std::string>(mImplPriv->mSDP->media_count), session));
// Add the stream to the SDP
pjmedia_sdp_media *media = allocate_from_pool<pjmedia_sdp_media>(mImplPriv->mDialog->pool);
@@ -1908,7 +2089,7 @@ pjmedia_sdp_session *SipSession::createSDPAnswer(const pjmedia_sdp_session* offe
ourStream->sources.push_back(source);
// Update the SIP session with some RTP session details
- mImplPriv->mRTPSessions.push_back(session);
+ mImplPriv->mRTPSessions.insert(make_pair(boost::lexical_cast<std::string>(stream), session));
// Just by common sense alone since we just got an RTP session for this stream we obviously
// have not added it to the answer SDP either, so do it
@@ -2014,7 +2195,7 @@ pjmedia_sdp_session *SipSession::createSDPAnswer(const pjmedia_sdp_session* offe
}
/**
- * Internal function which modifies our SDP.
+ * Internal function which modifies our SDP to remove streams.
*/
pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::StreamInformationDict& toRemove)
{
@@ -2061,6 +2242,91 @@ pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::StreamI
}
/**
+ * Internal function which modifies our SDP to contain remote destination and format details.
+ */
+pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections)
+{
+ // In order to modify SDP you have to have SDP
+ if (!mImplPriv->mSDP)
+ {
+ return 0;
+ }
+
+ // Iterate through each connection request
+ for (AsteriskSCF::Media::V1::DirectMediaConnectionDict::const_iterator connection = connections.begin();
+ connection != connections.end();
+ ++connection)
+ {
+ pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(connection->first)];
+
+ // Skip unknown streams
+ if (!media)
+ {
+ continue;
+ }
+
+ StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(connection->second);
+
+ // Update connection details on the stream
+ pj_strdup2(mImplPriv->mDialog->pool, &media->conn->addr, sink->getRemoteAddress().c_str());
+ media->desc.port = (pj_uint16_t) sink->getRemotePort();
+
+ // Drop all existing formats and attributes by changing the counts to zero, this is fine since the SDP generator
+ // uses the count to determine how far to go
+ media->desc.fmt_count = 0;
+ media->attr_count = 0;
+
+ // Add formats present on the sink, this is fine to do because an SDP descriptor will only be found for ones
+ // which are naturally configured on this endpoint
+ PayloadMap payloads;
+ addFormatstoSDP(sink->getFormats(), media, payloads);
+ }
+
+ return mImplPriv->mSDP;
+}
+
+/**
+ * Internal function which modifies our SDP to contain our local destination details and formats.
+ */
+pjmedia_sdp_session *SipSession::modifySDP(const Ice::StringSeq& streams)
+{
+ // In order to modify SDP you have to have SDP
+ if (!mImplPriv->mSDP)
+ {
+ return 0;
+ }
+
+ // Iterate through each stream
+ for (Ice::StringSeq::const_iterator stream = streams.begin();
+ stream != streams.end();
+ ++stream)
+ {
+ StreamInformationDict::const_iterator ourStream = mImplPriv->mStreams.find((*stream));
+
+ // Ignore streams that we don't know about
+ if (ourStream == mImplPriv->mStreams.end())
+ {
+ continue;
+ }
+
+ // These *will* exist and be true due to implementation
+ pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>((*stream))];
+ StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(ourStream->second->sources.front());
+
+ pj_strdup2(mImplPriv->mDialog->pool, &media->conn->addr, source->getLocalAddress().c_str());
+ media->desc.port = (pj_uint16_t) source->getLocalPort();
+
+ media->desc.fmt_count = 0;
+ media->attr_count = 0;
+
+ PayloadMap payloads;
+ addFormatstoSDP(ourStream->second->formats, media, payloads);
+ }
+
+ return mImplPriv->mSDP;
+}
+
+/**
* Internal function which sets the PJsip dialog.
*/
void SipSession::setDialog(pjsip_dialog *dialog)
@@ -2189,7 +2455,7 @@ void SipSession::setListeners(const SessionListenerSeq& listeners)
/**
* Internal function which returns the RTP media sessions that are hidden inside the SIP session.
*/
-RTPMediaSessionSeq SipSession::getRTPMediaSessions()
+RTPMediaSessionDict SipSession::getRTPMediaSessions()
{
return mImplPriv->mRTPSessions;
}
diff --git a/src/SipSession.h b/src/SipSession.h
index f82a369..767d62a 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -102,7 +102,7 @@ public:
SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&, const Ice::Identity&,
const Ice::Identity&,
- const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq&,
+ const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict&,
const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&,
PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
@@ -195,6 +195,10 @@ public:
pjmedia_sdp_session *modifySDP(const AsteriskSCF::Media::V1::StreamInformationDict&);
+ pjmedia_sdp_session *modifySDP(const AsteriskSCF::Media::V1::DirectMediaConnectionDict&);
+
+ pjmedia_sdp_session *modifySDP(const Ice::StringSeq&);
+
void setDialog(pjsip_dialog *dialog);
pjsip_dialog *getDialog();
@@ -228,7 +232,7 @@ public:
void setStreams(const AsteriskSCF::Media::V1::StreamInformationDict& streams);
- AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq getRTPMediaSessions();
+ AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict getRTPMediaSessions();
void enqueueSessionWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr&);
private:
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list