[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Sun Jul 10 09:49:07 CDT 2011
branch "master" has been updated
via e495e075f2f28738b0d8236dd76b353316240f41 (commit)
from cffbdedd90059681ec922ecc2e401e9b394cd70b (commit)
Summary of changes:
.../MediaRTPPJMedia/RtpStateReplicationIf.ice | 2 +-
src/RTPSink.cpp | 8 +-
src/RTPSource.cpp | 142 ++++++++++++++------
src/RTPSource.h | 7 +-
src/RtpStateReplicatorListener.cpp | 2 +-
test/TestRTPpjmedia.cpp | 20 ++--
6 files changed, 124 insertions(+), 57 deletions(-)
- Log -----------------------------------------------------------------
commit e495e075f2f28738b0d8236dd76b353316240f41
Author: Joshua Colp <jcolp at digium.com>
Date: Sun Jul 10 11:49:45 2011 -0300
Merge revised media design implementation.
diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index ee5862c..b9b08d7 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -98,7 +98,7 @@ module V1
class RtpStreamSourceStateItem extends RtpStateItem
{
- AsteriskSCF::Media::V1::StreamSink *mSink;
+ AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
};
}; /* module V1 */
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 2cf4741..7c65568 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -98,7 +98,7 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
AudioFormatPtr audioformat;
/* TODO: Add support for other types of media */
- if (!(audioformat = AudioFormatPtr::dynamicCast((*frame)->mediaformat)))
+ if (!(audioformat = AudioFormatPtr::dynamicCast((*frame)->mediaFormat)))
{
continue;
}
@@ -108,15 +108,15 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
int payload;
// Only allow media formats through that we support
- if ((payload = mImpl->mSession->getPayload((*frame)->mediaformat)) < 0)
+ if ((payload = mImpl->mSession->getPayload((*frame)->mediaFormat)) < 0)
{
throw UnsupportedMediaFormatException();
}
/* Using the available information construct an RTP header that we can place at the front of our packet */
pj_status_t status = pjmedia_rtp_encode_rtp(&mImpl->mOutgoingSession,
- mImpl->mSession->getPayload((*frame)->mediaformat), 0, (int) (*frame)->payload.size(),
- (int) (*frame)->payload.size(), &header, &header_len);
+ payload, 0, (int) (*frame)->payload.size(),
+ (int) (*frame)->payload.size(), &header, &header_len);
if (status != PJ_SUCCESS)
{
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 909bbc5..c7c9c58 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -16,9 +16,12 @@
#include <pjlib.h>
#include <pjmedia.h>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
+#include <boost/thread.hpp>
+
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
#include <AsteriskSCF/logger.h>
@@ -66,6 +69,11 @@ public:
* Stream source state item.
*/
RtpStreamSourceStateItemPtr mSourceStateItem;
+
+ /**
+ * Lock that protects information contained.
+ */
+ boost::shared_mutex mLock;
};
/**
@@ -88,21 +96,37 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const
}
/**
- * Implementation of the setSink method as defined in MediaIf.ice
+ * Implementation of the addSink method as defined in MediaIf.ice
*/
-void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
- mImpl->mSourceStateItem->mSink = sink;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.push_back(sink);
mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
}
/**
- * Implementation of the getSink method as defined in MediaIf.ice
+ * Implementation of the removeSink method as defined in MediaIf.ice
*/
-AsteriskSCF::Media::V1::StreamSinkPrx StreamSourceRTPImpl::getSink(const Ice::Current&)
+void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
- return mImpl->mSourceStateItem->mSink;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+ mImpl->mSourceStateItem->mSinks.end(),
+ sink), mImpl->mSourceStateItem->mSinks.end());
+
+ mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+}
+
+/**
+ * Implementation of the getSinks method as defined in MediaIf.ice
+ */
+AsteriskSCF::Media::V1::StreamSinkSeq StreamSourceRTPImpl::getSinks(const Ice::Current&)
+{
+ return mImpl->mSourceStateItem->mSinks;
}
/**
@@ -188,45 +212,75 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
return;
}
- if (source->mImpl->mSourceStateItem->mSink != 0)
+ // Update RTP stack information before writing to sinks, it's fine to do it
+ pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+
+ if (source->mImpl->mSourceStateItem->mSinks.empty())
+ {
+ // No sinks present so frames can not go anywhere
+ return;
+ }
+
+ FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+
+ if (!mediaformat)
+ {
+ // If this is for a payload we don't know about just drop the frame
+ return;
+ }
+
+ FrameSeq frames;
+
+ AudioFormatPtr audioformat;
+ VideoFormatPtr videoformat;
+
+ if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+ {
+ AudioFramePtr frame = new AudioFrame();
+ frame->mediaFormat = mediaformat;
+
+ // Populate the common data
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+
+ // Copy the payload from the RTP packet into the frame
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+
+ // Into the frames sequence it goes
+ frames.push_back(frame);
+ }
+ else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+ {
+ VideoFramePtr frame = new VideoFrame();
+ frame->mediaFormat = mediaformat;
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+ frames.push_back(frame);
+ }
+
+ if (frames.empty())
{
- FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+ // If the media format ended up being a type we don't understand don't bother writing it out
+ return;
+ }
- if (mediaformat != 0)
+ boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+
+ for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+ sink != source->mImpl->mSourceStateItem->mSinks.end();
+ ++sink)
+ {
+ try
+ {
+ (*sink)->write(frames);
+ }
+ catch (const Ice::Exception&)
{
- FrameSeq frames;
-
- AudioFormatPtr audioformat;
-
- if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
- {
- AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = mediaformat;
-
- /* Populate the common data */
- frame->timestamp = header->ts;
- frame->seqno = header->seq;
-
- /* Copy the payload from the RTP packet to the frame, yahoo! */
- copy(payload, payload + payload_size, std::back_inserter(frame->payload));
-
- /* Into the sequence it goes, yarrrrrrrrrr matey */
- frames.push_back(frame);
- }
-
- try
- {
- source->mImpl->mSourceStateItem->mSink->write(frames);
- }
- catch (const Ice::Exception&)
- {
- lg(Error) << "Exception caught while attempting to write media to an RTP sink";
- }
+ lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
}
}
- /* Now that all is said and done update the internal RTP stack state */
- pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
}
/**
@@ -280,3 +334,13 @@ RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
{
return mImpl->mSourceStateItem;
}
+
+/**
+ * API call which sets the sinks.
+ */
+void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks = sinks;
+}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index 9da0b2a..f032f88 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -22,14 +22,17 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
{
public:
StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
- void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
- AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+ void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
std::string getLocalAddress(const Ice::Current&);
Ice::Int getLocalPort(const Ice::Current&);
+
void setRemoteDetails(const std::string& address, Ice::Int port);
+ void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
/**
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index e3af713..d7a43f5 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -130,7 +130,7 @@ public:
mImpl->mStateItems.find(item->mSessionId);
if (i != mImpl->mStateItems.end())
{
- i->second->getSession()->getSource()->setSink(item->mSink, Ice::Current());
+ i->second->getSession()->getSource()->setSinks(item->mSinks);
}
}
};
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 36eab74..212c58e 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -495,7 +495,7 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSink)
BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSource)
{
BOOST_CHECK(Testbed.mListener->mSource);
- BOOST_CHECK(!Testbed.mListener->mSource->mSink);
+ BOOST_CHECK(Testbed.mListener->mSource->mSinks.empty());
}
/**
@@ -645,7 +645,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrametoEmptySink)
format->frameSize = 20;
AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = format;
+ frame->mediaFormat = format;
/* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
frame->payload.push_back('a');
@@ -735,9 +735,9 @@ BOOST_AUTO_TEST_CASE(ConfirmSinkSetting)
{
StreamSourceRTPPrx source = StreamSourceRTPPrx::checkedCast((*i));
- source->setSink(Testbed.sink);
+ source->addSink(Testbed.sink);
- StreamSinkPrx sink = source->getSink();
+ StreamSinkPrx sink = source->getSinks().front();
if (Testbed.sink == sink)
{
@@ -851,7 +851,7 @@ BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSink)
*/
BOOST_AUTO_TEST_CASE(ConfirmPopulatedReplicatedRTPSource)
{
- BOOST_CHECK(Testbed.mListener->mSource->mSink == Testbed.sink);
+ BOOST_CHECK(Testbed.mListener->mSource->mSinks.front() == Testbed.sink);
}
/**
@@ -869,7 +869,7 @@ BOOST_AUTO_TEST_CASE(TransmitandReceiveFrame)
format->frameSize = 20;
AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = format;
+ frame->mediaFormat = format;
/* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
frame->payload.push_back('a');
@@ -901,10 +901,10 @@ BOOST_AUTO_TEST_CASE(TransmitandReceiveFrame)
AudioFramePtr received_frame;
if (Testbed.frames.size() == 1 &&
(received_frame = AudioFramePtr::dynamicCast(Testbed.frames.front())) &&
- (received_frame->mediaformat->name == format->name))
+ (received_frame->mediaFormat->name == format->name))
{
AudioFormatPtr received_format;
- if ((received_format = AudioFormatPtr::dynamicCast(received_frame->mediaformat)) &&
+ if ((received_format = AudioFormatPtr::dynamicCast(received_frame->mediaFormat)) &&
(received_format->frameSize == format->frameSize) &&
(received_frame->payload == frame->payload))
{
@@ -938,7 +938,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrameWithUnsupportedMediaFormat)
format->frameSize = 20;
AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = format;
+ frame->mediaFormat = format;
frame->payload.push_back('a');
frame->payload.push_back('b');
@@ -1015,7 +1015,7 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
sink->setRemoteDetails(address, port);
AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = format;
+ frame->mediaFormat = format;
/* Populate the payload with some useless data, but enough to confirm the payload passes unaltered. */
frame->payload.push_back('a');
-----------------------------------------------------------------------
--
asterisk-scf/release/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list