[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "media" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Jun 13 15:40:14 CDT 2011
branch "media" has been created
at 63fdf41d91413e8dff8cc87432abde609e30af89 (commit)
- Log -----------------------------------------------------------------
commit 63fdf41d91413e8dff8cc87432abde609e30af89
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Jun 12 18:00:41 2011 -0300
Update to support media slice changes.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index a8b4ac8..3f9189b 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -98,7 +98,7 @@ module V1
class RtpStreamSourceStateItem extends RtpStateItem
{
- AsteriskSCF::Media::V1::StreamSink *mSink;
+ AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
};
}; /* module V1 */
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 3bc22c1..559eace 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -16,9 +16,12 @@
#include <pjlib.h>
#include <pjmedia.h>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
+#include <boost/thread.hpp>
+
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/logger.h>
@@ -65,6 +68,11 @@ public:
* Stream source state item.
*/
RtpStreamSourceStateItemPtr mSourceStateItem;
+
+ /**
+ * Lock that protects information contained.
+ */
+ boost::shared_mutex mLock;
};
/**
@@ -87,21 +95,37 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const
}
/**
- * Implementation of the setSink method as defined in MediaIf.ice
+ * Implementation of the addSink method as defined in MediaIf.ice
*/
-void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
- mImpl->mSourceStateItem->mSink = sink;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.push_back(sink);
mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
}
/**
- * Implementation of the getSink method as defined in MediaIf.ice
+ * Implementation of the removeSink method as defined in MediaIf.ice
*/
-AsteriskSCF::Media::V1::StreamSinkPrx StreamSourceRTPImpl::getSink(const Ice::Current&)
+void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
- return mImpl->mSourceStateItem->mSink;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+ mImpl->mSourceStateItem->mSinks.end(),
+ sink), mImpl->mSourceStateItem->mSinks.end());
+
+ mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+}
+
+/**
+ * Implementation of the getSinks method as defined in MediaIf.ice
+ */
+AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::Current&)
+{
+ return mImpl->mSourceStateItem->mSinks;
}
/**
@@ -187,45 +211,75 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
return;
}
- if (source->mImpl->mSourceStateItem->mSink != 0)
+ // Update RTP stack information before writing to sinks, it's fine to do it
+ pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+
+ if (source->mImpl->mSourceStateItem->mSinks.empty())
+ {
+ // No sinks present so frames can not go anywhere
+ return;
+ }
+
+ FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+
+ if (!mediaformat)
+ {
+ // If this is for a payload we don't know about just drop the frame
+ return;
+ }
+
+ FrameSeq frames;
+
+ AudioFormatPtr audioformat;
+ VideoFormatPtr videoformat;
+
+ if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+ {
+ AudioFramePtr frame = new AudioFrame();
+ frame->mediaformat = mediaformat;
+
+ // Populate the common data
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+
+ // Copy the payload from the RTP packet into the frame
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+
+ // Into the frames sequence it goes
+ frames.push_back(frame);
+ }
+ else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+ {
+ VideoFramePtr frame = new VideoFrame();
+ frame->mediaformat = mediaformat;
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+ frames.push_back(frame);
+ }
+
+ if (frames.empty())
{
- FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+ // If the media format ended up being a type we don't understand don't bother writing it out
+ return;
+ }
- if (mediaformat != 0)
+ boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+
+ for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+ sink != source->mImpl->mSourceStateItem->mSinks.end();
+ ++sink)
+ {
+ try
+ {
+ (*sink)->write(frames);
+ }
+ catch (const Ice::Exception&)
{
- FrameSeq frames;
-
- AudioFormatPtr audioformat;
-
- if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
- {
- AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = mediaformat;
-
- /* Populate the common data */
- frame->timestamp = header->ts;
- frame->seqno = header->seq;
-
- /* Copy the payload from the RTP packet to the frame, yahoo! */
- copy(payload, payload + payload_size, std::back_inserter(frame->payload));
-
- /* Into the sequence it goes, yarrrrrrrrrr matey */
- frames.push_back(frame);
- }
-
- try
- {
- source->mImpl->mSourceStateItem->mSink->write(frames);
- }
- catch (const Ice::Exception&)
- {
- lg(Error) << "Exception caught while attempting to write media to an RTP sink";
- }
+ lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
}
}
- /* Now that all is said and done update the internal RTP stack state */
- pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
}
/**
@@ -279,3 +333,13 @@ RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
{
return mImpl->mSourceStateItem;
}
+
+/**
+ * API call which sets the sinks.
+ */
+void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks = sinks;
+}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index d12bc48..045caf3 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -22,15 +22,18 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
{
public:
StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
- void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
- AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+ void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
std::string getLocalAddress(const Ice::Current&);
Ice::Int getLocalPort(const Ice::Current&);
+
void setRemoteDetails(const std::string& address, Ice::Int port);
AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr getStateItem();
+ void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
/**
* Private implementation data for StreamSourceRTPImpl.
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 4f3cef1..4a4bf9c 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -129,7 +129,7 @@ public:
mImpl->mStateItems.find(item->mSessionId);
if (i != mImpl->mStateItems.end())
{
- i->second->getSession()->getSource()->setSink(item->mSink, Ice::Current());
+ i->second->getSession()->getSource()->setSinks(item->mSinks);
}
}
};
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 1bb6f19..4b665f1 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -494,7 +494,7 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSink)
BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSource)
{
BOOST_CHECK(Testbed.mListener->mSource);
- BOOST_CHECK(!Testbed.mListener->mSource->mSink);
+ BOOST_CHECK(Testbed.mListener->mSource->mSinks.empty());
}
/**
@@ -734,9 +734,9 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
{
StreamSourceRTPPrx source = StreamSourceRTPPrx::checkedCast((*i));
- source->setSink(Testbed.sink);
+ source->addSink(Testbed.sink);
- StreamSinkPrx sink = source->getSink();
+ StreamSinkPrx sink = source->getSinks().front();
if (Testbed.sink == sink)
{
@@ -850,7 +850,7 @@ BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSink)
*/
BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSource)
{
- BOOST_CHECK(Testbed.mListener->mSource->mSink == Testbed.sink);
+ BOOST_CHECK(Testbed.mListener->mSource->mSinks.front() == Testbed.sink);
}
/**
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list