[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Aug 1 09:51:58 CDT 2011
branch "master" has been updated
via cdfa9be9fd76f722c1a801b8ecd0f1960ae543bd (commit)
from 3f858d88944b37a662d7cb7448f95eed6992bad0 (commit)
Summary of changes:
.../MediaRTPPJMedia/RtpStateReplicationIf.ice | 3 +
src/RTPSession.cpp | 273 ++++++++++++++++++-
src/RTPSession.h | 1 +
src/RTPSink.cpp | 3 +
src/RTPSource.cpp | 232 ++++++++++++++++-
src/RTPSource.h | 8 +-
src/RtpStateReplicatorListener.cpp | 2 +-
src/SessionAdapter.h | 8 +
test/TestRTPpjmedia.cpp | 277 +++++++++++++++++++-
9 files changed, 780 insertions(+), 27 deletions(-)
- Log -----------------------------------------------------------------
commit cdfa9be9fd76f722c1a801b8ecd0f1960ae543bd
Author: Joshua Colp <jcolp at digium.com>
Date: Mon Aug 1 11:51:26 2011 -0300
Add support for sending and receiving of RTCP. This also adds support for getting statistic information from RTP originated sinks and sources.
diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index ff9d0d2..f3aae1f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -91,6 +91,8 @@ module V1
AsteriskSCF::Media::RTP::V1::PayloadMap mPayloadstoFormats;
bool mIPv6;
bool mSRTP;
+ string mRemoteRtcpAddress;
+ int mRemoteRtcpPort;
};
class RtpStreamSinkStateItem extends RtpStateItem
@@ -104,6 +106,7 @@ module V1
{
AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
};
+
}; /* module V1 */
}; /* module MediaRTPPJMedia */
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 88fca37..69c1fea 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -33,18 +33,108 @@
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media;
using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::Discovery;
using namespace AsteriskSCF::PJMediaRTP;
/**
+ * RTCP Information Interface implementation.
+ */
+class RTCPInformationImpl : public RTCP::V1::Information
+{
+public:
+ RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+ mGeneralStatistics(general), mStreamStatistics(stream) { }
+
+ RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
+ {
+ RTCP::V1::StatisticsPtr statistics = new RTCP::V1::Statistics();
+
+ statistics->roundTripDelay = new RTCP::V1::ExtendedDetails();
+ statistics->roundTripDelay->maximum = mGeneralStatistics->rtt.max;
+ statistics->roundTripDelay->minimum = mGeneralStatistics->rtt.min;
+ statistics->roundTripDelay->last = mGeneralStatistics->rtt.last;
+ statistics->roundTripDelay->mean = mGeneralStatistics->rtt.mean;
+
+ statistics->packets = mStreamStatistics->pkt;
+ statistics->discardedPackets = mStreamStatistics->discard;
+ statistics->lostPackets = mStreamStatistics->loss;
+ statistics->outOfOrderPackets = mStreamStatistics->reorder;
+ statistics->duplicatePackets = mStreamStatistics->dup;
+
+ statistics->loss = new RTCP::V1::ExtendedDetails();
+ statistics->loss->maximum = mStreamStatistics->loss_period.max;
+ statistics->loss->minimum = mStreamStatistics->loss_period.min;
+ statistics->loss->last = mStreamStatistics->loss_period.last;
+ statistics->loss->mean = mStreamStatistics->loss_period.mean;
+
+ statistics->jitter = new RTCP::V1::ExtendedDetails();
+ statistics->jitter->maximum = mStreamStatistics->jitter.max;
+ statistics->jitter->minimum = mStreamStatistics->jitter.min;
+ statistics->jitter->last = mStreamStatistics->jitter.last;
+ statistics->jitter->mean = mStreamStatistics->jitter.mean;
+
+ return statistics;
+ }
+
+ void addListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mListeners.push_back(listener);
+ }
+
+ void removeListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+ }
+
+ /**
+ * Implementation specific function which returns a copy of the listeners.
+ */
+ std::vector<RTCP::V1::InformationListenerPrx> getListeners()
+ {
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ return mListeners;
+ }
+
+private:
+ /**
+ * Lock to protect the listeners.
+ */
+ boost::shared_mutex mLock;
+
+ /**
+ * Listeners present.
+ */
+ std::vector<RTCP::V1::InformationListenerPrx> mListeners;
+
+ /**
+ * Structure where general RTCP information is.
+ */
+ pjmedia_rtcp_stat *mGeneralStatistics;
+
+ /**
+ * Structure where stream RTCP information is.
+ */
+ pjmedia_rtcp_stream_stat *mStreamStatistics;
+};
+
+/**
+ * Smart pointer for the above RTCPInformationImpl class.
+ */
+typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
+
+/**
* Implementation of the RTPSession interface as defined in MediaRTPIf.ice
*/
class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
@@ -99,6 +189,8 @@ public:
*/
StreamSourceRTPImplPtr getSourceServant();
StreamSinkRTPImplPtr getSinkServant();
+ PJMediaTransportPtr getTransport();
+
void replicateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
@@ -108,6 +200,12 @@ public:
const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& payloadMap);
+ void setRemoteRtcpDetails(const std::string&, Ice::Int);
+ pjmedia_rtcp_session* getRtcpSession();
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners();
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics();
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners();
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics();
RTPSessionPrx activate(const string& id);
RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId, const Ice::Identity& sinkId);
@@ -193,6 +291,26 @@ private:
* A proxy to the state replicator where we are sending updates to.
*/
AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+
+ /**
+ * RTCP session for this RTP session.
+ */
+ pjmedia_rtcp_session mRtcpSession;
+
+ /**
+ * A pointer to the RTCP session interface.
+ */
+ RTCP::V1::RTCPSessionPtr mRtcpSessionInterface;
+
+ /**
+ * RTCP information for Receiver Report.
+ */
+ RTCPInformationImplPtr mReceiverReport;
+
+ /**
+ * RTCP information for Sender Report.
+ */
+ RTCPInformationImplPtr mSenderReport;
};
/**
@@ -200,6 +318,42 @@ private:
*/
typedef IceUtil::Handle<RTPSessionImpl> RTPSessionImplPtr;
+/**
+ * Implementation of the RTCPSession interface as defined in MediaRTCPIf.ice
+ */
+class RTCPSessionImpl : public RTCP::V1::RTCPSession
+{
+public:
+ /**
+ * Constructor for this implementation.
+ */
+ RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+
+ /**
+ * Method used to retrieve the port our RTCP session is listening on.
+ */
+ int getLocalPort(const Ice::Current&)
+ {
+ pjmedia_transport_info transportInfo;
+
+ pjmedia_transport_info_init(&transportInfo);
+ pjmedia_transport_get_info(mSession->getTransport()->getTransport(), &transportInfo);
+
+ return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
+ }
+
+ void setRemoteDetails(const std::string& address, Ice::Int port, const Ice::Current&)
+ {
+ mSession->setRemoteRtcpDetails(address, port);
+ }
+
+private:
+ /**
+ * Pointer to the RTP session.
+ */
+ RTPSessionImplPtr mSession;
+};
+
//
// Provides an adapter to the session implementation without having to expose the entire header file. This is a
// step-wise decoupling of the session implementation from the other objects in this component. The whole
@@ -243,6 +397,31 @@ public:
mServant->setRemoteDetails(host, port);
}
+ pjmedia_rtcp_session* getRtcpSession()
+ {
+ return mServant->getRtcpSession();
+ }
+
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners()
+ {
+ return mServant->getReceiverReportListeners();
+ }
+
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics()
+ {
+ return mServant->getReceiverReportStatistics();
+ }
+
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners()
+ {
+ return mServant->getSenderReportListeners();
+ }
+
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics()
+ {
+ return mServant->getSenderReportStatistics();
+ }
+
private:
RTPSessionImplPtr mServant;
};
@@ -301,6 +480,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
mSessionStateItem->mSessionIdentity = mAdapter->getCommunicator()->stringToIdentity(mId);
mSessionStateItem->mFormats = params->formats;
mSessionStateItem->mIPv6 = params->ipv6;
+
+ pjmedia_rtcp_init(&mRtcpSession, NULL, 8000, 160, 0);
}
/**
@@ -340,6 +521,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
{
mTransport = SRTPTransport::create(mTransport, mEndpoint, configurationService);
}
+
+ pjmedia_rtcp_init(&mRtcpSession, NULL, 8000, 160, 0);
}
/**
@@ -383,16 +566,6 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
}
/**
- * Implementation of the useRTCP method as defined in MediaRTPIf.ice
- */
-void RTPSessionImpl::useRTCP(bool, const Ice::Current&)
-{
- //
- // TODO.
- //
-}
-
-/**
* Implementation of the release method as defined in MediaRTPIf.ice
*/
void RTPSessionImpl::release(const Ice::Current&)
@@ -404,7 +577,7 @@ void RTPSessionImpl::release(const Ice::Current&)
mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
- /* Since both the source and sink have a pointer back to the session we need to get rid of them,
+ /* Since the source and sink have a pointer back to the session we need to get rid of them,
* which will in turn get rid of ourselves once we are removed from the ASM.
*/
mStreamSource = 0;
@@ -450,6 +623,56 @@ void RTPSessionImpl::start(const string& suiteName, const string& key, bool enab
}
/**
+ * API call which returns the RTCP session used for this RTP session.
+ *
+ * @return A pointer to the RTCP session.
+ */
+pjmedia_rtcp_session* RTPSessionImpl::getRtcpSession()
+{
+ return &mRtcpSession;
+}
+
+/**
+ * API call which returns the listeners wanting statistics for receiving as they change.
+ *
+ * @return A copy of the listeners wanting the statistics.
+ */
+std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> RTPSessionImpl::getReceiverReportListeners()
+{
+ return mReceiverReport->getListeners();
+}
+
+/**
+ * API call which returns current statistics for receiving.
+ *
+ * @return The current receiving statistics.
+ */
+AsteriskSCF::Media::RTCP::V1::StatisticsPtr RTPSessionImpl::getReceiverReportStatistics()
+{
+ return mReceiverReport->getStatistics(Ice::Current());
+}
+
+/**
+ * API call which returns the listeners wanting statistics for sending as they change.
+ *
+ * @return A copy of the listeners wanting the statistics.
+ */
+std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> RTPSessionImpl::getSenderReportListeners()
+{
+ return mSenderReport->getListeners();
+}
+
+/**
+ * API call which returns current statistics for sending.
+ *
+ * @return The current sending statistics.
+ */
+AsteriskSCF::Media::RTCP::V1::StatisticsPtr RTPSessionImpl::getSenderReportStatistics()
+{
+ return mSenderReport->getStatistics(Ice::Current());
+}
+
+/**
* API call which returns the formats the RTP session is expected to carry.
*
* @return A sequence of media formats.
@@ -468,6 +691,19 @@ void RTPSessionImpl::setRemoteDetails(const string& address, Ice::Int port)
}
/**
+ * API call which calls into RTPSourceImpl in order to setup transport.
+ */
+void RTPSessionImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+ mSessionStateItem->mRemoteRtcpAddress = address;
+ mSessionStateItem->mRemoteRtcpPort = port;
+
+ mStreamSource->setRemoteRtcpDetails(address, port);
+
+ replicateState(mSessionStateItem, 0, 0);
+}
+
+/**
* API call which returns a media format based on payload.
*
* @return The media format corresponding to the payload.
@@ -511,6 +747,11 @@ StreamSinkRTPImplPtr RTPSessionImpl::getSinkServant()
return mStreamSink;
}
+PJMediaTransportPtr RTPSessionImpl::getTransport()
+{
+ return mTransport;
+}
+
/**
* API call which replicates state items.
*/
@@ -623,10 +864,17 @@ RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id, const Ice::Ident
mSessionAdapter.reset(new SessionAdapterImpl(this));
try
{
- mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId);
+ mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId, StreamSourceRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sourceId)),
+ StreamSinkRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sinkId)));
mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mTransport, mId);
+ mRtcpSessionInterface = new RTCPSessionImpl(this);
+ mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
+ mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
+ mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
+ mAdapter->addFacet(mReceiverReport, sourceId, RTCP::V1::Facet);
+ mAdapter->addFacet(mSenderReport, sinkId, RTCP::V1::Facet);
if (mSessionStateItem)
{
@@ -686,6 +934,7 @@ public:
void update(const RtpSessionStateItemPtr& item)
{
mImpl->associatePayloadsImpl(item->mPayloadstoFormats);
+ mImpl->setRemoteRtcpDetails(item->mRemoteRtcpAddress, item->mRemoteRtcpPort);
}
void update(const RtpStreamSinkStateItemPtr& item)
diff --git a/src/RTPSession.h b/src/RTPSession.h
index b6f6d54..a4f8072 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -19,6 +19,7 @@
#include <AsteriskSCF/Discovery/SmartProxy.h>
#include <boost/shared_ptr.hpp>
+
namespace AsteriskSCF
{
namespace PJMediaRTP
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 7d3d07b..8184c1a 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -154,6 +154,9 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
/* TODO: Transmission failed... what to do? */
continue;
}
+
+ // Update RTCP information
+ pjmedia_rtcp_tx_rtp(mImpl->mSessionAdapter->getRtcpSession(), static_cast<unsigned int>((*frame)->payload.size()));
}
}
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index ed3ad97..4ae46f9 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -22,16 +22,19 @@
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
+#include <IceUtil/Timer.h>
#include <boost/thread.hpp>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Media;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
@@ -44,6 +47,63 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
}
/**
+ * TimerTask implementation which sends RTCP at a defined interval.
+ */
+class RtcpTransmission : public IceUtil::TimerTask
+{
+public:
+ RtcpTransmission(const SessionAdapterPtr& sessionAdapter, const StreamSinkRTPPrx& sink, const PJMediaTransportPtr& transport) :
+ mSessionAdapter(sessionAdapter), mSink(sink), mTransport(transport) { }
+
+ void runTimerTask()
+ {
+ void *packet;
+ int packet_size;
+
+ pjmedia_rtcp_build_rtcp(mSessionAdapter->getRtcpSession(), &packet, &packet_size);
+ pjmedia_transport_send_rtcp(mTransport->getTransport(), packet, packet_size);
+
+ std::vector<RTCP::V1::InformationListenerPrx> listeners = mSessionAdapter->getSenderReportListeners();
+
+ // If no listeners exist don't bother getting the statistics
+ if (listeners.empty())
+ {
+ return;
+ }
+
+ RTCP::V1::StatisticsPtr statistics = mSessionAdapter->getSenderReportStatistics();
+
+ for (std::vector<RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+ listener != listeners.end();
+ ++listener)
+ {
+ (*listener)->sinkStatisticsUpdated(mSink, statistics);
+ }
+ }
+
+private:
+ /**
+ * A pointer to the session adapter.
+ */
+ SessionAdapterPtr mSessionAdapter;
+
+ /**
+ * Proxy to the sink.
+ */
+ StreamSinkRTPPrx mSink;
+
+ /**
+ * Pointer to the transport used for communication.
+ */
+ PJMediaTransportPtr mTransport;
+};
+
+/**
+ * Smart pointer for the above RtcpTransmission class.
+ */
+typedef IceUtil::Handle<RtcpTransmission> RtcpTransmissionPtr;
+
+/**
* Private implementation details for the StreamSourceRTPImpl class.
*/
class StreamSourceRTPImplPriv
@@ -53,8 +113,16 @@ public:
* Constructor for our StreamSourceRTPImplPriv class.
*/
StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter,
- const PJMediaTransportPtr& transport,
- const string& parentSessionId);
+ const PJMediaTransportPtr& transport,
+ const string& parentSessionId,
+ const StreamSourceRTPPrx& source,
+ const StreamSinkRTPPrx& sink
+ );
+
+ /**
+ * Destructor for our StreamSourceRTPImplPriv class.
+ */
+ ~StreamSourceRTPImplPriv();
/**
* A structure containing incoming pjmedia session data.
@@ -78,6 +146,21 @@ public:
string mSessionId;
/**
+ * Timer used for sending RTCP reports.
+ */
+ IceUtil::TimerPtr mTimer;
+
+ /**
+ * Source of media, conveyed to RTCP listeners.
+ */
+ StreamSourceRTPPrx mSource;
+
+ /**
+ * Sink for media, conveyed to RTCP listeners.
+ */
+ StreamSinkRTPPrx mSink;
+
+ /**
* Lock that protects information contained.
*/
boost::shared_mutex mLock;
@@ -87,23 +170,41 @@ public:
* Constructor for the StreamSourceRTPImplPriv class.
*/
StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
- const PJMediaTransportPtr& transport,
- const string& sessionId) :
+ const PJMediaTransportPtr& transport,
+ const string& sessionId,
+ const StreamSourceRTPPrx& source,
+ const StreamSinkRTPPrx& sink) :
mSessionAdapter(session), mTransport(transport),
mSourceStateItem(new RtpStreamSourceStateItem),
- mSessionId(sessionId)
+ mSessionId(sessionId),
+ mSource(source),
+ mSink(sink)
{
pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
mSourceStateItem->mSessionId = sessionId;
mSourceStateItem->key = IceUtil::generateUUID();
-};
+}
+
+/**
+ * Destructor for the StreamSourceRTPImplPriv class.
+ */
+StreamSourceRTPImplPriv::~StreamSourceRTPImplPriv()
+{
+ // Destroy the RTCP transmission timer if it exists
+ if (mTimer)
+ {
+ mTimer->destroy();
+ }
+}
/**
* Constructor for the StreamSourceRTPImpl class.
*/
StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
- const PJMediaTransportPtr& transport, const string& sessionId) :
- mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId))
+ const PJMediaTransportPtr& transport, const string& sessionId,
+ const StreamSourceRTPPrx& source,
+ const StreamSinkRTPPrx& sink) :
+ mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
{
}
@@ -233,7 +334,27 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
}
// Update RTP stack information before writing to sinks, it's fine to do it
- pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+ pjmedia_rtp_status rtpStatus;
+ pjmedia_rtp_session_update2(&source->mImpl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
+
+ // Update RTCP information
+ pjmedia_rtcp_rx_rtp2(source->mImpl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
+ payload_size, ((rtpStatus.status.value && rtpStatus.status.flag.bad) || !payload_size)
+ ? PJ_TRUE : PJ_FALSE);
+
+ // If the SSRC has changed contact any appropriate listeners
+ if (rtpStatus.status.value && rtpStatus.status.flag.badssrc)
+ {
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
+ source->mImpl->mSessionAdapter->getReceiverReportListeners();
+
+ for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+ listener != listeners.end();
+ ++listener)
+ {
+ (*listener)->sourceSsrcChanged(source->mImpl->mSource, source->mImpl->mIncomingSession.peer_ssrc);
+ }
+ }
if (source->mImpl->mSourceStateItem->mSinks.empty())
{
@@ -304,6 +425,40 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
}
/**
+ * Function which is called when RTCP is received.
+ */
+static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
+{
+ StreamSourceRTPImpl* source = static_cast<StreamSourceRTPImpl*>(userdata);
+
+ /* Ensure that no errors occurred when reading this packet in */
+ if (size < 0)
+ {
+ lg(Error) << "We attempted to read data from an RTCP session but failed.";
+ return;
+ }
+
+ pjmedia_rtcp_rx_rtcp(source->mImpl->mSessionAdapter->getRtcpSession(), packet, size);
+
+ std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
+ source->mImpl->mSessionAdapter->getReceiverReportListeners();
+
+ if (listeners.empty())
+ {
+ return;
+ }
+
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = source->mImpl->mSessionAdapter->getReceiverReportStatistics();
+
+ for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+ listener != listeners.end();
+ ++listener)
+ {
+ (*listener)->sourceStatisticsUpdated(source->mImpl->mSource, statistics);
+ }
+}
+
+/**
* API call which sets up our pjmedia transport and allows media to be sent and received.
*/
void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
@@ -339,7 +494,8 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
/* All ready... actually do it! */
- status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &addr, NULL, pj_sockaddr_get_len(&addr), &receiveRTP, NULL);
+ status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &addr, &transportInfo.src_rtcp_name,
+ pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
if (status != PJ_SUCCESS)
{
@@ -348,6 +504,62 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
}
/**
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
+ */
+void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+ pj_sockaddr addr;
+
+ /* This feels so dirty but convert from our std::string to a pj_str, since their API requires it. */
+ pj_str_t tmpAddress;
+ pj_strset(&tmpAddress, (char*)address.c_str(), address.size());
+
+ /* Now for the next trick - convert into a pj_sockaddr so we can pass it to pjmedia_transport_attach */
+ pj_status_t status = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &tmpAddress, &addr);
+
+ if (status != PJ_SUCCESS)
+ {
+ throw InvalidAddress();
+ }
+
+ // Confirm that the address family of the address matches that of this RTP session
+ pjmedia_transport_info transportInfo;
+
+ pjmedia_transport_info_init(&transportInfo);
+ pjmedia_transport_get_info(mImpl->mTransport->getTransport(), &transportInfo);
+
+ if (transportInfo.sock_info.rtcp_addr_name.addr.sa_family != addr.addr.sa_family)
+ {
+ throw InvalidAddress();
+ }
+
+ pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
+
+ /* In case we were already attached go ahead and detach */
+ pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
+
+ /* All ready... actually do it! */
+ status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &transportInfo.src_rtp_name, &addr,
+ pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
+
+ if (status != PJ_SUCCESS)
+ {
+ throw InvalidAddress();
+ }
+
+ // If RTCP is not already being sent start sending it
+ if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
+ {
+ RtcpTransmissionPtr transmission;
+
+ if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport)))
+ {
+ mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
+ }
+ }
+}
+
+/**
* API call which returns a pointer to the source state item.
*/
RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
diff --git a/src/RTPSource.h b/src/RTPSource.h
index f100cd4..aeeff2c 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -25,8 +25,10 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
{
public:
StreamSourceRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter,
- const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
- const std::string& parentSessionId);
+ const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
+ const std::string& parentSessionId,
+ const AsteriskSCF::Media::RTP::V1::StreamSourceRTPPrx& source,
+ const AsteriskSCF::Media::RTP::V1::StreamSinkRTPPrx& sink);
void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
@@ -39,6 +41,8 @@ public:
void setRemoteDetails(const std::string& address, Ice::Int port);
void setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq&);
+ void setRemoteRtcpDetails(const std::string&, Ice::Int);
+
AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 4e1ab03..9ceb5a5 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -114,7 +114,7 @@ public:
{
localitem = i->second;
}
-
+
//
// TODO: This appears to happen in testing on occasion. Should verify if this should be
// expected.
diff --git a/src/SessionAdapter.h b/src/SessionAdapter.h
index d164bca..de017b2 100644
--- a/src/SessionAdapter.h
+++ b/src/SessionAdapter.h
@@ -20,10 +20,13 @@
#include <boost/shared_ptr.hpp>
#include "RtpStateReplicationIf.h"
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
+
//
// XXX forward declaration that will be obsoleted soon.
//
struct pjmedia_transport;
+struct pjmedia_rtcp_session;
namespace AsteriskSCF
{
@@ -53,6 +56,11 @@ public:
virtual int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) = 0;
virtual AsteriskSCF::Media::V1::FormatSeq getFormats() = 0;
virtual void setRemoteDetails(const std::string& host, int port) = 0;
+ virtual pjmedia_rtcp_session* getRtcpSession() = 0;
+ virtual std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners() = 0;
+ virtual AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics() = 0;
+ virtual std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners() = 0;
+ virtual AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics() = 0;
};
} /* End of namespace PJMediaRTP */
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 212c58e..ca15d65 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -21,6 +21,8 @@
#endif
#define BOOST_TEST_NO_MAIN
+#include <pjmedia.h>
+
#include <boost/test/unit_test.hpp>
#include <boost/test/debug.hpp>
#include <boost/thread/thread.hpp>
@@ -33,11 +35,13 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
#include "RtpStateReplicationIf.h"
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Media;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
@@ -77,6 +81,33 @@ public:
typedef IceUtil::Handle<TestRtpReplicatorListener> TestRtpReplicatorListenerPtr;
+class TestInformationListener : public RTCP::V1::InformationListener
+{
+public:
+ TestInformationListener() : mSsrcChanged(false) { }
+
+ void sourceStatisticsUpdated(const StreamSourceRTPPrx&, const RTCP::V1::StatisticsPtr& statistics, const Ice::Current&)
+ {
+ mSourceStatistics = statistics;
+ }
+
+ void sinkStatisticsUpdated(const StreamSinkRTPPrx&, const RTCP::V1::StatisticsPtr& statistics, const Ice::Current&)
+ {
+ mSinkStatistics = statistics;
+ }
+
+ void sourceSsrcChanged(const StreamSourceRTPPrx&, Ice::Int, const Ice::Current&)
+ {
+ mSsrcChanged = true;
+ }
+
+ RTCP::V1::StatisticsPtr mSourceStatistics;
+ RTCP::V1::StatisticsPtr mSinkStatistics;
+ bool mSsrcChanged;
+};
+
+typedef IceUtil::Handle<TestInformationListener> TestInformationListenerPtr;
+
/**
* It seems odd that boost doesn't provide an easy way to access the GLOBAL_FIXTURE members.
* But it doesn't seem to, so I'm sharing global setup stuff here.
@@ -110,6 +141,16 @@ public:
RtpStateReplicatorListenerPrx mListenerProxy;
/**
+ * Instance of our RTCP information listener.
+ */
+ TestInformationListenerPtr mInformationListener;
+
+ /**
+ * A proxy to our information listener.
+ */
+ RTCP::V1::InformationListenerPrx mInformationListenerProxy;
+
+ /**
* A proxy to the RTP session that we requested.
*/
RTPSessionPrx session;
@@ -275,6 +316,11 @@ struct GlobalIceFixture
Testbed.mListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(Testbed.adapter->addWithUUID(Testbed.mListener));
+ Testbed.mInformationListener = new TestInformationListener();
+
+ Testbed.mInformationListenerProxy = RTCP::V1::InformationListenerPrx::uncheckedCast(Testbed.adapter->addWithUUID(
+ Testbed.mInformationListener));
+
Testbed.adapter->activate();
Testbed.locator = ServiceLocatorPrx::checkedCast(Testbed.communicator->stringToProxy("LocatorService:tcp -p 4411"));
@@ -476,6 +522,9 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSession)
StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
BOOST_CHECK(Testbed.mListener->mSession->mSourceIdentity == source->ice_getIdentity());
BOOST_CHECK(Testbed.mListener->mSession->mPort == source->getLocalPort());
+
+ BOOST_CHECK(!Testbed.mListener->mSession->mRemoteRtcpAddress.size());
+ BOOST_CHECK(!Testbed.mListener->mSession->mRemoteRtcpPort);
}
/**
@@ -569,6 +618,60 @@ BOOST_AUTO_TEST_CASE(VerifyLocalAddressonSources)
}
/**
+ * Check that we receive a local port that RTCP remote RTCP is to be sent to
+ */
+BOOST_AUTO_TEST_CASE(VerifyLocalRTCPPortonSession)
+{
+ int port = 0;
+
+ try
+ {
+ RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+ RTCP::V1::SessionFacet);
+ port = session->getLocalPort();
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+
+ BOOST_CHECK(port);
+}
+
+/**
+ * Check that we can set remote details for RTCP and that they get replicated
+ */
+BOOST_AUTO_TEST_CASE(VerifyReplicatedRTCPRemoteDetails)
+{
+ boost::mutex::scoped_lock lock(Testbed.mLock);
+
+#ifdef IPV6_TEST
+ std::string address = "::1";
+#else
+ std::string address = "127.0.0.1";
+#endif
+
+ try
+ {
+ RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+ RTCP::V1::SessionFacet);
+
+ session->setRemoteDetails(address, 10001);
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+
+ Testbed.mCondition.wait(lock);
+
+ BOOST_CHECK(Testbed.mListener->mSession->mRemoteRtcpAddress == address);
+ BOOST_CHECK(Testbed.mListener->mSession->mRemoteRtcpPort == 10001);
+}
+
+/**
* Check that the RTP session has at least one sink
*/
BOOST_AUTO_TEST_CASE(CheckForSink)
@@ -683,7 +786,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrametoEmptySink)
/**
* Check that we can set the remote address information properly
*/
-BOOST_AUTO_TEST_CASE(ConfirmRemoteAddressSetting)
+BOOST_AUTO_TEST_CASE(ConfirmRemoteRTPAddressSetting)
{
bool set = false;
@@ -794,9 +897,102 @@ BOOST_AUTO_TEST_CASE(PushPayloadMappings)
}
/**
+ * Add RTCP information listener to source
+ */
+BOOST_AUTO_TEST_CASE(AddRTCPListenerToSource)
+{
+ bool added = false;
+
+ try
+ {
+ StreamSourceSeq sources = Testbed.session->getSources();
+ StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+ RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(source,
+ RTCP::V1::Facet);
+ information->addListener(Testbed.mInformationListenerProxy);
+ added = true;
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(added);
+}
+
+/**
+ * Add RTCP information listener to sink
+ */
+BOOST_AUTO_TEST_CASE(AddRTCPListenerToSink)
+{
+ bool added = false;
+
+ try
+ {
+ StreamSinkSeq sinks = Testbed.session->getSinks();
+ StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(sinks.front());
+ RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(sink,
+ RTCP::V1::Facet);
+ information->addListener(Testbed.mInformationListenerProxy);
+ added = true;
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(added);
+}
+
+/**
+ * Setup RTCP loopback from the session, to, well, the session
+ */
+BOOST_AUTO_TEST_CASE(SetupRTCPLoopback)
+{
+ bool looped = false;
+
+ try
+ {
+ StreamSourceSeq sources = Testbed.session->getSources();
+ StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+ string address = source->getLocalAddress();
+
+ RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+ RTCP::V1::SessionFacet);
+ int port = session->getLocalPort();
+
+ boost::mutex::scoped_lock lock(Testbed.mLock);
+
+ session->setRemoteDetails(address, port);
+
+ looped = true;
+
+ Testbed.mCondition.wait(lock);
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(looped);
+}
+
+/**
* Setup RTP loopback from the session to, well, the session
*/
-BOOST_AUTO_TEST_CASE(SetupLoopback)
+BOOST_AUTO_TEST_CASE(SetupRTPLoopback)
{
bool looped = false;
@@ -1051,6 +1247,83 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
}
/**
+ * Wait for and retrieve RTCP statistics information from the source.
+ */
+BOOST_AUTO_TEST_CASE(WaitForAndRetrieveSourceRTCPStatistics)
+{
+ RTCP::V1::StatisticsPtr statistics;
+
+ try
+ {
+ StreamSourceSeq sources = Testbed.session->getSources();
+ StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+
+ RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(source, RTCP::V1::Facet);
+
+ // Give enough time for a few RTCP packets to be sent
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL * 4));
+
+ // Now retrieve the statistics and peek at them
+ statistics = information->getStatistics();
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(statistics);
+ BOOST_CHECK(statistics->roundTripDelay->last);
+ BOOST_CHECK(statistics->packets);
+}
+
+/**
+ * Retrieve RTCP statistics information from the sink.
+ */
+BOOST_AUTO_TEST_CASE(RetrieveSinkRTCPStatistics)
+{
+ RTCP::V1::StatisticsPtr statistics;
+
+ try
+ {
+ StreamSinkSeq sinks = Testbed.session->getSinks();
+ StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(sinks.front());
+
+ RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(sink, RTCP::V1::Facet);
+ statistics = information->getStatistics();
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(statistics);
+ BOOST_CHECK(statistics->roundTripDelay->last);
+ BOOST_CHECK(statistics->packets);
+}
+
+/**
+ * Confirm that our information listener was called and that statistics information is proper.
+ */
+BOOST_AUTO_TEST_CASE(ConfirmInformationListenerCalled)
+{
+ BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics);
+ BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics->roundTripDelay->last);
+ BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics->packets);
+
+ BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics);
+ BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics->roundTripDelay->last);
+ BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics->packets);
+}
+
+/**
* Attempt to set an IPv4 address on an IPv6 only sink OR try to set an IPv6 address
* on an IPv4 only sink.
*/
-----------------------------------------------------------------------
--
asterisk-scf/release/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list