[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "directmedia" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Sat Jul 16 16:59:28 CDT 2011


branch "directmedia" has been created
        at  a274eda1bfb32791b092950b644dfc319edcfac8 (commit)

- Log -----------------------------------------------------------------
commit a274eda1bfb32791b092950b644dfc319edcfac8
Author: Joshua Colp <jcolp at digium.com>
Date:   Sat Jul 16 19:00:17 2011 -0300

    Add direct media connection support. This allows media to be directly connected external to Asterisk SCF.

diff --git a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
index db8dd79..be776f3 100644
--- a/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/SipSessionManager/SipStateReplicationIf.ice
@@ -143,7 +143,7 @@ module V1
 
    sequence<AsteriskSCF::SessionCommunications::V1::SessionListener*> SessionListenerSeq;
 
-   sequence<AsteriskSCF::Media::RTP::V1::RTPSession*> RTPMediaSessionSeq;
+   dictionary<string, AsteriskSCF::Media::RTP::V1::RTPSession*> RTPMediaSessionDict;
 
    class SipSessionStateItem extends SipStateItem
    {
@@ -156,7 +156,7 @@ module V1
       AsteriskSCF::Media::V1::StreamSourceSeq mSources;
       AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
       AsteriskSCF::Media::V1::StreamInformationDict mStreams;
-      RTPMediaSessionSeq mRTPMediaSessions;
+      RTPMediaSessionDict mRTPMediaSessions;
       SessionListenerSeq mListeners;
       AsteriskSCF::SessionCommunications::V1::Bridge *mBridge;
       AsteriskSCF::SessionCommunications::V1::SessionCookieDict mCookies;
diff --git a/src/PJSipSessionModule.cpp b/src/PJSipSessionModule.cpp
index 6f0dbba..635dd8b 100644
--- a/src/PJSipSessionModule.cpp
+++ b/src/PJSipSessionModule.cpp
@@ -223,11 +223,11 @@ void PJSipSessionModule::replicateState(PJSipDialogModInfo *dlgInfo, PJSipTransa
         lg(Debug) << "Session object identity: " << sessionInfo->mSessionState->mSessionObjectId.name;
         lg(Debug) << "Media session object identity: " << sessionInfo->mSessionState->mMediaSessionObjectId.name;
 
-	for (RTPMediaSessionSeq::const_iterator mediaSession = sessionInfo->mSessionState->mRTPMediaSessions.begin();
+	for (RTPMediaSessionDict::const_iterator mediaSession = sessionInfo->mSessionState->mRTPMediaSessions.begin();
              mediaSession != sessionInfo->mSessionState->mRTPMediaSessions.end();
              ++mediaSession)
         {
-            lg(Debug) << "Media session: " << (*mediaSession);
+            lg(Debug) << "Media session: " << mediaSession->second;
         }
 
         lg(Debug) << "Bridge: " << sessionInfo->mSessionState->mBridge;
diff --git a/src/SipEndpoint.cpp b/src/SipEndpoint.cpp
index 67f753f..6dc9d62 100644
--- a/src/SipEndpoint.cpp
+++ b/src/SipEndpoint.cpp
@@ -345,7 +345,7 @@ AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const s
 AsteriskSCF::SipSessionManager::SipSessionPtr SipEndpoint::createSession(const std::string& destination,
                                                                          const Ice::Identity& sessionid, const Ice::Identity& controllerid,
                                                                          const Ice::Identity& mediaid,
-                                                                         const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+                                                                         const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict& mediasessions,
                                                                          const AsteriskSCF::Media::V1::StreamSourceSeq& sources,
                                                                          const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
 {
diff --git a/src/SipEndpoint.h b/src/SipEndpoint.h
index b92127f..3e4a9a2 100644
--- a/src/SipEndpoint.h
+++ b/src/SipEndpoint.h
@@ -240,7 +240,7 @@ public:
     //
     AsteriskSCF::SipSessionManager::SipSessionPtr createSession(const std::string&);
     AsteriskSCF::SipSessionManager::SipSessionPtr createSession(const std::string&, const Ice::Identity&, const Ice::Identity&,
-                                                                const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq&,
+                                                                const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict&,
                                                                 const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&);
 
     void removeSession(const AsteriskSCF::SessionCommunications::V1::SessionPtr&);
diff --git a/src/SipSession.cpp b/src/SipSession.cpp
index 642b191..54fc1a4 100644
--- a/src/SipSession.cpp
+++ b/src/SipSession.cpp
@@ -221,9 +221,9 @@ public:
     pjsip_inv_session *mInviteSession;
 
     /**
-     * A vector of RTP media sessions belonging to this endpoint.
+     * A dictionary of streams and their respective RTP session.
      */
-    RTPMediaSessionSeq mRTPSessions;
+    RTPMediaSessionDict mRTPSessions;
 
     /**
      * A vector of media sources associated with this endpoint.
@@ -555,6 +555,181 @@ private:
     SipSessionPtr mSession;
 };
 
+class CheckDirectConnectionsOperation : public SuspendableWork
+{
+public:
+    CheckDirectConnectionsOperation(
+        const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr& cb,
+        const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
+        const boost::shared_ptr<SipSessionPriv>& sessionPriv)
+        : mCb(cb), mConnections(connections), mImplPriv(sessionPriv) { }
+        
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    {
+        Ice::StringSeq allowedStreams;
+
+        lg(Debug) << "Executing a checkDirectConnections Operation";
+
+        // Iterate through each requested connection checking if it is possible
+        for (AsteriskSCF::Media::V1::DirectMediaConnectionDict::const_iterator connection = mConnections.begin();
+             connection != mConnections.end();
+             ++connection)
+        {
+            // Do we have a local stream that matches this?
+	    StreamInformationDict::const_iterator stream = mImplPriv->mStreams.find(connection->first);
+
+            // Uh, if they passed in a stream that we know nothing about skip it...
+            if (stream == mImplPriv->mStreams.end())
+            {
+                continue;
+            }
+
+            StreamSinkRTPPrx sink = StreamSinkRTPPrx::checkedCast(connection->second);
+
+            // If the provided sink is not an RTP one this connection is not possible
+            if (!sink)
+            {
+                continue;
+            }
+
+            std::string their_address = sink->getRemoteAddress();
+
+	    StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(stream->second->sources.front());
+	    std::string our_address = source->getLocalAddress();
+
+            // Is the address compatible with the address family on it?
+	    if ((our_address.find(":") == std::string::npos && their_address.find(":") != std::string::npos) ||
+		(our_address.find(":") != std::string::npos && their_address.find(":") == std::string::npos))
+	    {
+		continue;
+	    }
+
+            // It passed! We can establish a direct external media connection
+            allowedStreams.push_back(connection->first);
+        }
+
+        mCb->ice_response(allowedStreams);
+
+        return Complete;
+    }
+
+private:
+    AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr mCb;
+    AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
+    boost::shared_ptr<SipSessionPriv> mImplPriv;        
+};
+
+
+class ConnectStreamsOperation : public SuspendableWork
+{
+public:
+    ConnectStreamsOperation(
+        const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+        const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections,
+        const SipSessionPtr& session)
+        : mCb(cb), mConnections(connections), mSession(session) { }
+
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    {
+        pjmedia_sdp_session *sdp = mSession->modifySDP(mConnections);
+
+        if (!mSession->getInviteSession()->invite_tsx)
+        {
+            pjsip_tx_data *packet = NULL;
+
+            if ((pjsip_inv_reinvite(mSession->getInviteSession(), NULL, sdp, &packet)) == PJ_SUCCESS)
+            {
+                pjsip_inv_send_msg(mSession->getInviteSession(), packet);
+            }
+        }
+        else
+        {
+            pjsip_inv_set_sdp_answer(mSession->getInviteSession(), sdp);
+        }
+
+        mCb->ice_response();
+
+        return Complete;
+    }
+
+private:
+    AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr mCb;
+    AsteriskSCF::Media::V1::DirectMediaConnectionDict mConnections;
+    const SipSessionPtr mSession;
+};
+
+class DisconnectStreamsOperation : public SuspendableWork
+{
+public:
+    DisconnectStreamsOperation(
+        const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+        const Ice::StringSeq& streams,
+        const SipSessionPtr& session)
+        : mCb(cb), mStreams(streams), mSession(session) { }
+
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    {
+        pjmedia_sdp_session *sdp = mSession->modifySDP(mStreams);
+        pjsip_tx_data *packet = NULL;
+
+        if ((pjsip_inv_reinvite(mSession->getInviteSession(), NULL, sdp, &packet)) == PJ_SUCCESS)
+        {
+            pjsip_inv_send_msg(mSession->getInviteSession(), packet);
+        }
+
+        mCb->ice_response();
+
+        return Complete;
+    }
+
+private:
+    AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr mCb;
+    Ice::StringSeq mStreams;
+    const SipSessionPtr mSession;
+};
+
+/**
+ * Implementation of a DirectMediaConnection interface for the SipSession.
+ */
+class SipDirectMediaConnection : public AsteriskSCF::Media::V1::DirectMediaConnection
+{
+public:
+    SipDirectMediaConnection(boost::shared_ptr<SipSessionPriv> implPriv, const SipSessionPtr& session) :
+        mImplPriv(implPriv), mSession(session) { }
+
+    void checkDirectConnections_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_checkDirectConnectionsPtr& cb,
+                                      const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
+    {
+        lg(Debug) << "Queueing checkDirectConnections operation";
+        mSession->enqueueSessionWork(new CheckDirectConnectionsOperation(cb, connections, mImplPriv));
+    }
+
+    void connectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_connectStreamsPtr& cb,
+                              const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections, const Ice::Current&)
+    {
+        lg(Debug) << "Queueing connectStreams operation";
+        mSession->enqueueSessionWork(new ConnectStreamsOperation(cb, connections, mSession));
+    }
+
+    void disconnectStreams_async(const AsteriskSCF::Media::V1::AMD_DirectMediaConnection_disconnectStreamsPtr& cb,
+                                 const Ice::StringSeq& streams, const Ice::Current&)
+    {
+        lg(Debug) << "Queueing disconnectStreams operation";
+        mSession->enqueueSessionWork(new DisconnectStreamsOperation(cb, streams, mSession));
+    }
+
+private:
+    /**
+     * Private implementation details for SipSession.
+     */
+    boost::shared_ptr<SipSessionPriv> mImplPriv;
+
+    /**
+     * A pointer to the communications session that created us.
+     */
+    SipSessionPtr mSession;
+};
+
 void SipSession::initializePJSIPStructs()
 {
     pj_str_t local_uri, remote_uri;
@@ -642,6 +817,9 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
     mImplPriv->mOurSessionControllerProxy =
         AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->addWithUUID(mImplPriv->mOurSessionController));
 
+    DirectMediaConnectionPtr directMedia = new SipDirectMediaConnection(mImplPriv, this);
+    adapter->addFacet(directMedia, mImplPriv->mSessionProxy->ice_getIdentity(), directMediaConnectionFacet);
+
     if (isUAC)
     {
         lg(Debug) << "New session is UAC, so we're creating the necessary PJSIP structures";
@@ -655,7 +833,7 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
 SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPtr& endpoint,
                        const std::string& destination, const Ice::Identity& sessionid,
                        const Ice::Identity& controllerid,
-                       const Ice::Identity& mediaid, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq& mediasessions,
+                       const Ice::Identity& mediaid, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict& mediasessions,
                        const AsteriskSCF::Media::V1::StreamSourceSeq& sources, const AsteriskSCF::Media::V1::StreamSinkSeq& sinks,
                        PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
                        const AsteriskSCF::System::Component::V1::ReplicaPtr& replica, bool isUAC)
@@ -672,6 +850,9 @@ SipSession::SipSession(const Ice::ObjectAdapterPtr& adapter, const SipEndpointPt
     mImplPriv->mOurSessionControllerProxy =
         AsteriskSCF::SessionCommunications::V1::SessionControllerPrx::uncheckedCast(adapter->add(mImplPriv->mOurSessionController, controllerid));
 
+    DirectMediaConnectionPtr directMedia = new SipDirectMediaConnection(mImplPriv, this);
+    adapter->addFacet(directMedia, sessionid, directMediaConnectionFacet);
+
     mImplPriv->mRTPSessions = mediasessions;
     mImplPriv->mSources = sources;
     mImplPriv->mSinks = sinks;
@@ -1431,12 +1612,12 @@ public:
         {
             //XXX This loop may be a candidate for making AMI-ified and returning "Suspended"
             // Release all the RTP sessions we are using
-            for (RTPMediaSessionSeq::const_iterator i = mSessionPriv->mRTPSessions.begin();
+            for (RTPMediaSessionDict::const_iterator i = mSessionPriv->mRTPSessions.begin();
 		 i != mSessionPriv->mRTPSessions.end(); ++i)
             {
                 try
                 {
-                    (*i)->release();
+                    i->second->release();
                 }
                 catch (const Ice::Exception& ex)
                 {
@@ -1634,7 +1815,7 @@ pjmedia_sdp_session *SipSession::createSDPOffer(const AsteriskSCF::Media::V1::St
 	stream->second->sources.push_back(source);
 
         // Update the SIP session with some RTP session details
-        mImplPriv->mRTPSessions.push_back(session);
+	mImplPriv->mRTPSessions.insert(make_pair(boost::lexical_cast<std::string>(mImplPriv->mSDP->media_count), session));
 
         // Add the stream to the SDP
         pjmedia_sdp_media *media = allocate_from_pool<pjmedia_sdp_media>(mImplPriv->mDialog->pool);
@@ -1908,7 +2089,7 @@ pjmedia_sdp_session *SipSession::createSDPAnswer(const pjmedia_sdp_session* offe
             ourStream->sources.push_back(source);
 
             // Update the SIP session with some RTP session details
-            mImplPriv->mRTPSessions.push_back(session);
+	    mImplPriv->mRTPSessions.insert(make_pair(boost::lexical_cast<std::string>(stream), session));
 
             // Just by common sense alone since we just got an RTP session for this stream we obviously
             // have not added it to the answer SDP either, so do it
@@ -2014,7 +2195,7 @@ pjmedia_sdp_session *SipSession::createSDPAnswer(const pjmedia_sdp_session* offe
 }
 
 /**
- * Internal function which modifies our SDP.
+ * Internal function which modifies our SDP to remove streams.
  */
 pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::StreamInformationDict& toRemove)
 {
@@ -2061,6 +2242,91 @@ pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::StreamI
 }
 
 /**
+ * Internal function which modifies our SDP to contain remote destination and format details.
+ */
+pjmedia_sdp_session *SipSession::modifySDP(const AsteriskSCF::Media::V1::DirectMediaConnectionDict& connections)
+{
+    // In order to modify SDP you have to have SDP
+    if (!mImplPriv->mSDP)
+    {
+        return 0;
+    }
+
+    // Iterate through each connection request
+    for (AsteriskSCF::Media::V1::DirectMediaConnectionDict::const_iterator connection = connections.begin();
+	 connection != connections.end();
+	 ++connection)
+    {
+	pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>(connection->first)];
+
+        // Skip unknown streams
+        if (!media)
+        {
+            continue;
+        }
+
+	StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(connection->second);
+
+	// Update connection details on the stream
+	pj_strdup2(mImplPriv->mDialog->pool, &media->conn->addr, sink->getRemoteAddress().c_str());
+	media->desc.port = (pj_uint16_t) sink->getRemotePort();
+
+	// Drop all existing formats and attributes by changing the counts to zero, this is fine since the SDP generator
+	// uses the count to determine how far to go
+	media->desc.fmt_count = 0;
+	media->attr_count = 0;
+
+	// Add formats present on the sink, this is fine to do because an SDP descriptor will only be found for ones
+	// which are naturally configured on this endpoint
+	PayloadMap payloads;
+	addFormatstoSDP(sink->getFormats(), media, payloads);
+    }
+    
+    return mImplPriv->mSDP;
+}
+
+/**
+ * Internal function which modifies our SDP to contain our local destination details and formats.
+ */
+pjmedia_sdp_session *SipSession::modifySDP(const Ice::StringSeq& streams)
+{
+    // In order to modify SDP you have to have SDP
+    if (!mImplPriv->mSDP)
+    {
+        return 0;
+    }
+
+    // Iterate through each stream
+    for (Ice::StringSeq::const_iterator stream = streams.begin();
+         stream != streams.end();
+         ++stream)
+    {
+        StreamInformationDict::const_iterator ourStream = mImplPriv->mStreams.find((*stream));
+
+        // Ignore streams that we don't know about
+        if (ourStream == mImplPriv->mStreams.end())
+        {
+            continue;
+        }
+
+        // These *will* exist and be true due to implementation
+        pjmedia_sdp_media *media = mImplPriv->mSDP->media[boost::lexical_cast<int>((*stream))];
+        StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(ourStream->second->sources.front());
+
+        pj_strdup2(mImplPriv->mDialog->pool, &media->conn->addr, source->getLocalAddress().c_str());
+        media->desc.port = (pj_uint16_t) source->getLocalPort();
+
+        media->desc.fmt_count = 0;
+        media->attr_count = 0;
+
+        PayloadMap payloads;
+        addFormatstoSDP(ourStream->second->formats, media, payloads);
+    }
+
+    return mImplPriv->mSDP;
+}
+
+/**
  * Internal function which sets the PJsip dialog.
  */
 void SipSession::setDialog(pjsip_dialog *dialog)
@@ -2189,7 +2455,7 @@ void SipSession::setListeners(const SessionListenerSeq& listeners)
 /**
  * Internal function which returns the RTP media sessions that are hidden inside the SIP session.
  */
-RTPMediaSessionSeq SipSession::getRTPMediaSessions()
+RTPMediaSessionDict SipSession::getRTPMediaSessions()
 {
     return mImplPriv->mRTPSessions;
 }
diff --git a/src/SipSession.h b/src/SipSession.h
index f82a369..767d62a 100644
--- a/src/SipSession.h
+++ b/src/SipSession.h
@@ -102,7 +102,7 @@ public:
 
     SipSession(const Ice::ObjectAdapterPtr&, const SipEndpointPtr&, const std::string&, const Ice::Identity&,
                const Ice::Identity&,
-               const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq&,
+               const Ice::Identity&, const AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict&,
                const AsteriskSCF::Media::V1::StreamSourceSeq&, const AsteriskSCF::Media::V1::StreamSinkSeq&,
                PJSipManager *manager, const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
                const AsteriskSCF::System::Component::V1::ReplicaPtr& replica,
@@ -195,6 +195,10 @@ public:
 
     pjmedia_sdp_session *modifySDP(const AsteriskSCF::Media::V1::StreamInformationDict&);
 
+    pjmedia_sdp_session *modifySDP(const AsteriskSCF::Media::V1::DirectMediaConnectionDict&);
+
+    pjmedia_sdp_session *modifySDP(const Ice::StringSeq&);
+
     void setDialog(pjsip_dialog *dialog);
 
     pjsip_dialog *getDialog();
@@ -228,7 +232,7 @@ public:
 
     void setStreams(const AsteriskSCF::Media::V1::StreamInformationDict& streams);
 
-    AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionSeq getRTPMediaSessions();
+    AsteriskSCF::Replication::SipSessionManager::V1::RTPMediaSessionDict getRTPMediaSessions();
 
     void enqueueSessionWork(const AsteriskSCF::System::WorkQueue::V1::SuspendableWorkPtr&);
 private:

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list