[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "media" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Jun 13 15:40:14 CDT 2011


branch "media" has been created
        at  63fdf41d91413e8dff8cc87432abde609e30af89 (commit)

- Log -----------------------------------------------------------------
commit 63fdf41d91413e8dff8cc87432abde609e30af89
Author: Joshua Colp <jcolp at digium.com>
Date:   Sun Jun 12 18:00:41 2011 -0300

    Update to support media slice changes.

diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index a8b4ac8..3f9189b 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -98,7 +98,7 @@ module V1
 
     class RtpStreamSourceStateItem extends RtpStateItem
     {
-	AsteriskSCF::Media::V1::StreamSink *mSink;
+	AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
     };
 }; /* module V1 */
 
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 3bc22c1..559eace 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -16,9 +16,12 @@
 
 #include <pjlib.h>
 #include <pjmedia.h>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
 
+#include <boost/thread.hpp>
+
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/logger.h>
@@ -65,6 +68,11 @@ public:
      * Stream source state item.
      */
     RtpStreamSourceStateItemPtr mSourceStateItem;
+
+    /**
+     * Lock that protects information contained.
+     */
+    boost::shared_mutex mLock;
 };
 
 /**
@@ -87,21 +95,37 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const
 }
 
 /**
- * Implementation of the setSink method as defined in MediaIf.ice
+ * Implementation of the addSink method as defined in MediaIf.ice
  */
-void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
-    mImpl->mSourceStateItem->mSink = sink;
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks.push_back(sink);
 
     mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
 }
 
 /**
- * Implementation of the getSink method as defined in MediaIf.ice
+ * Implementation of the removeSink method as defined in MediaIf.ice
  */
-AsteriskSCF::Media::V1::StreamSinkPrx StreamSourceRTPImpl::getSink(const Ice::Current&)
+void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
-    return mImpl->mSourceStateItem->mSink;
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+	    mImpl->mSourceStateItem->mSinks.end(),
+	    sink), mImpl->mSourceStateItem->mSinks.end());
+
+    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+}
+
+/**
+ * Implementation of the getSinks method as defined in MediaIf.ice
+ */
+AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::Current&)
+{
+    return mImpl->mSourceStateItem->mSinks;
 }
 
 /**
@@ -187,45 +211,75 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         return;
     }
 
-    if (source->mImpl->mSourceStateItem->mSink != 0)
+    // Update RTP stack information before writing to sinks, it's fine to do it
+    pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+
+    if (source->mImpl->mSourceStateItem->mSinks.empty())
+    {
+	// No sinks present so frames can not go anywhere
+	return;
+    }
+
+    FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+
+    if (!mediaformat)
+    {
+	// If this is for a payload we don't know about just drop the frame
+	return;
+    }
+
+    FrameSeq frames;
+
+    AudioFormatPtr audioformat;
+    VideoFormatPtr videoformat;
+
+    if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+    {
+        AudioFramePtr frame = new AudioFrame();
+        frame->mediaformat = mediaformat;
+
+        // Populate the common data
+        frame->timestamp = header->ts;
+        frame->seqno = header->seq;
+
+        // Copy the payload from the RTP packet into the frame
+        copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+
+        // Into the frames sequence it goes
+        frames.push_back(frame);
+    }
+    else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+    {
+        VideoFramePtr frame = new VideoFrame();
+        frame->mediaformat = mediaformat;
+        frame->timestamp = header->ts;
+        frame->seqno = header->seq;
+        copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+        frames.push_back(frame);
+    }
+
+    if (frames.empty())
     {
-        FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+        // If the media format ended up being a type we don't understand don't bother writing it out
+        return;
+    }
 
-        if (mediaformat != 0)
+    boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+
+    for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+         sink != source->mImpl->mSourceStateItem->mSinks.end();
+         ++sink)
+    {
+        try
+        {
+            (*sink)->write(frames);
+        }
+        catch (const Ice::Exception&)
         {
-            FrameSeq frames;
-
-            AudioFormatPtr audioformat;
-
-            if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
-            {
-                AudioFramePtr frame = new AudioFrame();
-                frame->mediaformat = mediaformat;
-
-                /* Populate the common data */
-                frame->timestamp = header->ts;
-                frame->seqno = header->seq;
-
-                /* Copy the payload from the RTP packet to the frame, yahoo! */
-                copy(payload, payload + payload_size, std::back_inserter(frame->payload));
-
-                /* Into the sequence it goes, yarrrrrrrrrr matey */
-                frames.push_back(frame);
-            }
-
-            try
-            {
-                source->mImpl->mSourceStateItem->mSink->write(frames);
-            }
-            catch (const Ice::Exception&)
-            {
-                lg(Error) << "Exception caught while attempting to write media to an RTP sink";
-            }
+            lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
         }
     }
 
-    /* Now that all is said and done update the internal RTP stack state */
-    pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
 }
 
 /**
@@ -279,3 +333,13 @@ RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
 {
     return mImpl->mSourceStateItem;
 }
+
+/**
+ * API call which sets the sinks.
+ */
+void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks = sinks;
+}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index d12bc48..045caf3 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -22,15 +22,18 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
 {
 public:
     StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
-    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
-    AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+    void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
     std::string getId(const Ice::Current&);
     void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
     std::string getLocalAddress(const Ice::Current&);
     Ice::Int getLocalPort(const Ice::Current&);
+
     void setRemoteDetails(const std::string& address, Ice::Int port);
     AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr getStateItem();
+    void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
 
     /**
      * Private implementation data for StreamSourceRTPImpl.
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 4f3cef1..4a4bf9c 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -129,7 +129,7 @@ public:
                     mImpl->mStateItems.find(item->mSessionId);
 		if (i != mImpl->mStateItems.end())
 		{
-		    i->second->getSession()->getSource()->setSink(item->mSink, Ice::Current());
+		    i->second->getSession()->getSource()->setSinks(item->mSinks);
 		}
 	    }
 	};
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 1bb6f19..4b665f1 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -494,7 +494,7 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSink)
 BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSource)
 {
     BOOST_CHECK(Testbed.mListener->mSource);
-    BOOST_CHECK(!Testbed.mListener->mSource->mSink);
+    BOOST_CHECK(Testbed.mListener->mSource->mSinks.empty());
 }
 
 /**
@@ -734,9 +734,9 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
         {
             StreamSourceRTPPrx source = StreamSourceRTPPrx::checkedCast((*i));
 
-            source->setSink(Testbed.sink);
+            source->addSink(Testbed.sink);
 
-            StreamSinkPrx sink = source->getSink();
+            StreamSinkPrx sink = source->getSinks().front();
 
             if (Testbed.sink == sink)
             {
@@ -850,7 +850,7 @@ BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSink)
  */
 BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSource)
 {
-    BOOST_CHECK(Testbed.mListener->mSource->mSink == Testbed.sink);
+    BOOST_CHECK(Testbed.mListener->mSource->mSinks.front() == Testbed.sink);
 }
 
 /**

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list