[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Sun Jul 10 09:49:07 CDT 2011


branch "master" has been updated
       via  e495e075f2f28738b0d8236dd76b353316240f41 (commit)
      from  cffbdedd90059681ec922ecc2e401e9b394cd70b (commit)

Summary of changes:
 .../MediaRTPPJMedia/RtpStateReplicationIf.ice      |    2 +-
 src/RTPSink.cpp                                    |    8 +-
 src/RTPSource.cpp                                  |  142 ++++++++++++++------
 src/RTPSource.h                                    |    7 +-
 src/RtpStateReplicatorListener.cpp                 |    2 +-
 test/TestRTPpjmedia.cpp                            |   20 ++--
 6 files changed, 124 insertions(+), 57 deletions(-)


- Log -----------------------------------------------------------------
commit e495e075f2f28738b0d8236dd76b353316240f41
Author: Joshua Colp <jcolp at digium.com>
Date:   Sun Jul 10 11:49:45 2011 -0300

    Merge revised media design implementation.

diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index ee5862c..b9b08d7 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -98,7 +98,7 @@ module V1
 
     class RtpStreamSourceStateItem extends RtpStateItem
     {
-	AsteriskSCF::Media::V1::StreamSink *mSink;
+	AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
     };
 }; /* module V1 */
 
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 2cf4741..7c65568 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -98,7 +98,7 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
         AudioFormatPtr audioformat;
 
         /* TODO: Add support for other types of media */
-        if (!(audioformat = AudioFormatPtr::dynamicCast((*frame)->mediaformat)))
+        if (!(audioformat = AudioFormatPtr::dynamicCast((*frame)->mediaFormat)))
         {
             continue;
         }
@@ -108,15 +108,15 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
         int payload;
 
         // Only allow media formats through that we support
-        if ((payload = mImpl->mSession->getPayload((*frame)->mediaformat)) < 0)
+        if ((payload = mImpl->mSession->getPayload((*frame)->mediaFormat)) < 0)
         {
             throw UnsupportedMediaFormatException();
         }
 
         /* Using the available information construct an RTP header that we can place at the front of our packet */
         pj_status_t status = pjmedia_rtp_encode_rtp(&mImpl->mOutgoingSession,
-						    mImpl->mSession->getPayload((*frame)->mediaformat), 0, (int) (*frame)->payload.size(),
-						    (int) (*frame)->payload.size(), &header, &header_len);
+                                                    payload, 0, (int) (*frame)->payload.size(),
+                                                    (int) (*frame)->payload.size(), &header, &header_len);
 
         if (status != PJ_SUCCESS)
         {
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 909bbc5..c7c9c58 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -16,9 +16,12 @@
 
 #include <pjlib.h>
 #include <pjmedia.h>
+
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
 
+#include <boost/thread.hpp>
+
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/logger.h>
@@ -66,6 +69,11 @@ public:
      * Stream source state item.
      */
     RtpStreamSourceStateItemPtr mSourceStateItem;
+
+    /**
+     * Lock that protects information contained.
+     */
+    boost::shared_mutex mLock;
 };
 
 /**
@@ -88,21 +96,37 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const
 }
 
 /**
- * Implementation of the setSink method as defined in MediaIf.ice
+ * Implementation of the addSink method as defined in MediaIf.ice
  */
-void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
-    mImpl->mSourceStateItem->mSink = sink;
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks.push_back(sink);
 
     mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
 }
 
 /**
- * Implementation of the getSink method as defined in MediaIf.ice
+ * Implementation of the removeSink method as defined in MediaIf.ice
  */
-AsteriskSCF::Media::V1::StreamSinkPrx StreamSourceRTPImpl::getSink(const Ice::Current&)
+void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
-    return mImpl->mSourceStateItem->mSink;
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+	    mImpl->mSourceStateItem->mSinks.end(),
+	    sink), mImpl->mSourceStateItem->mSinks.end());
+
+    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+}
+
+/**
+ * Implementation of the getSinks method as defined in MediaIf.ice
+ */
+AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::Current&)
+{
+    return mImpl->mSourceStateItem->mSinks;
 }
 
 /**
@@ -188,45 +212,75 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
         return;
     }
 
-    if (source->mImpl->mSourceStateItem->mSink != 0)
+    // Update RTP stack information before writing to sinks, it's fine to do it
+    pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+
+    if (source->mImpl->mSourceStateItem->mSinks.empty())
+    {
+	// No sinks present so frames can not go anywhere
+	return;
+    }
+
+    FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+
+    if (!mediaformat)
+    {
+	// If this is for a payload we don't know about just drop the frame
+	return;
+    }
+
+    FrameSeq frames;
+
+    AudioFormatPtr audioformat;
+    VideoFormatPtr videoformat;
+
+    if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+    {
+        AudioFramePtr frame = new AudioFrame();
+        frame->mediaFormat = mediaformat;
+
+        // Populate the common data
+        frame->timestamp = header->ts;
+        frame->seqno = header->seq;
+
+        // Copy the payload from the RTP packet into the frame
+        copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+
+        // Into the frames sequence it goes
+        frames.push_back(frame);
+    }
+    else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+    {
+        VideoFramePtr frame = new VideoFrame();
+        frame->mediaFormat = mediaformat;
+        frame->timestamp = header->ts;
+        frame->seqno = header->seq;
+        copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+        frames.push_back(frame);
+    }
+
+    if (frames.empty())
     {
-        FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+        // If the media format ended up being a type we don't understand don't bother writing it out
+        return;
+    }
 
-        if (mediaformat != 0)
+    boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+
+    for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+         sink != source->mImpl->mSourceStateItem->mSinks.end();
+         ++sink)
+    {
+        try
+        {
+            (*sink)->write(frames);
+        }
+        catch (const Ice::Exception&)
         {
-            FrameSeq frames;
-
-            AudioFormatPtr audioformat;
-
-            if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
-            {
-                AudioFramePtr frame = new AudioFrame();
-                frame->mediaformat = mediaformat;
-
-                /* Populate the common data */
-                frame->timestamp = header->ts;
-                frame->seqno = header->seq;
-
-                /* Copy the payload from the RTP packet to the frame, yahoo! */
-                copy(payload, payload + payload_size, std::back_inserter(frame->payload));
-
-                /* Into the sequence it goes, yarrrrrrrrrr matey */
-                frames.push_back(frame);
-            }
-
-            try
-            {
-                source->mImpl->mSourceStateItem->mSink->write(frames);
-            }
-            catch (const Ice::Exception&)
-            {
-                lg(Error) << "Exception caught while attempting to write media to an RTP sink";
-            }
+            lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
         }
     }
 
-    /* Now that all is said and done update the internal RTP stack state */
-    pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
 }
 
 /**
@@ -280,3 +334,13 @@ RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
 {
     return mImpl->mSourceStateItem;
 }
+
+/**
+ * API call which sets the sinks.
+ */
+void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+    mImpl->mSourceStateItem->mSinks = sinks;
+}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index 9da0b2a..f032f88 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -22,14 +22,17 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
 {
 public:
     StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
-    void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
-    AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+    void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+    AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
     std::string getId(const Ice::Current&);
     void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
     std::string getLocalAddress(const Ice::Current&);
     Ice::Int getLocalPort(const Ice::Current&);
+
     void setRemoteDetails(const std::string& address, Ice::Int port);
+    void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
 
     /**
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index e3af713..d7a43f5 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -130,7 +130,7 @@ public:
                     mImpl->mStateItems.find(item->mSessionId);
 		if (i != mImpl->mStateItems.end())
 		{
-		    i->second->getSession()->getSource()->setSink(item->mSink, Ice::Current());
+		    i->second->getSession()->getSource()->setSinks(item->mSinks);
 		}
 	    }
 	};
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 36eab74..212c58e 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -495,7 +495,7 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSink)
 BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSource)
 {
     BOOST_CHECK(Testbed.mListener->mSource);
-    BOOST_CHECK(!Testbed.mListener->mSource->mSink);
+    BOOST_CHECK(Testbed.mListener->mSource->mSinks.empty());
 }
 
 /**
@@ -645,7 +645,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrametoEmptySink)
         format->frameSize = 20;
 
         AudioFramePtr frame = new AudioFrame();
-        frame->mediaformat = format;
+        frame->mediaFormat = format;
 
         /* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
         frame->payload.push_back('a');
@@ -735,9 +735,9 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
         {
             StreamSourceRTPPrx source = StreamSourceRTPPrx::checkedCast((*i));
 
-            source->setSink(Testbed.sink);
+            source->addSink(Testbed.sink);
 
-            StreamSinkPrx sink = source->getSink();
+            StreamSinkPrx sink = source->getSinks().front();
 
             if (Testbed.sink == sink)
             {
@@ -851,7 +851,7 @@ BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSink)
  */
 BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSource)
 {
-    BOOST_CHECK(Testbed.mListener->mSource->mSink == Testbed.sink);
+    BOOST_CHECK(Testbed.mListener->mSource->mSinks.front() == Testbed.sink);
 }
 
 /**
@@ -869,7 +869,7 @@ BOOST_AUTO_TEST_CASE(TransmitandReceiveFrame)
         format->frameSize = 20;
 
         AudioFramePtr frame = new AudioFrame();
-        frame->mediaformat = format;
+        frame->mediaFormat = format;
 
         /* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
         frame->payload.push_back('a');
@@ -901,10 +901,10 @@ BOOST_AUTO_TEST_CASE(TransmitandReceiveFrame)
         AudioFramePtr received_frame;
         if (Testbed.frames.size() == 1 &&
             (received_frame = AudioFramePtr::dynamicCast(Testbed.frames.front())) &&
-            (received_frame->mediaformat->name == format->name))
+            (received_frame->mediaFormat->name == format->name))
         {
             AudioFormatPtr received_format;
-            if ((received_format = AudioFormatPtr::dynamicCast(received_frame->mediaformat)) &&
+            if ((received_format = AudioFormatPtr::dynamicCast(received_frame->mediaFormat)) &&
                 (received_format->frameSize == format->frameSize) &&
                 (received_frame->payload == frame->payload))
             {
@@ -938,7 +938,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrameWithUnsupportedMediaFormat)
         format->frameSize = 20;
 
         AudioFramePtr frame = new AudioFrame();
-        frame->mediaformat = format;
+        frame->mediaFormat = format;
 
         frame->payload.push_back('a');
         frame->payload.push_back('b');
@@ -1015,7 +1015,7 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
         sink->setRemoteDetails(address, port);
 
         AudioFramePtr frame = new AudioFrame();
-        frame->mediaformat = format;
+        frame->mediaFormat = format;
 
         /* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
         frame->payload.push_back('a');

-----------------------------------------------------------------------


-- 
asterisk-scf/release/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list