[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Aug 1 09:51:58 CDT 2011


branch "master" has been updated
       via  cdfa9be9fd76f722c1a801b8ecd0f1960ae543bd (commit)
      from  3f858d88944b37a662d7cb7448f95eed6992bad0 (commit)

Summary of changes:
 .../MediaRTPPJMedia/RtpStateReplicationIf.ice      |    3 +
 src/RTPSession.cpp                                 |  273 ++++++++++++++++++-
 src/RTPSession.h                                   |    1 +
 src/RTPSink.cpp                                    |    3 +
 src/RTPSource.cpp                                  |  232 ++++++++++++++++-
 src/RTPSource.h                                    |    8 +-
 src/RtpStateReplicatorListener.cpp                 |    2 +-
 src/SessionAdapter.h                               |    8 +
 test/TestRTPpjmedia.cpp                            |  277 +++++++++++++++++++-
 9 files changed, 780 insertions(+), 27 deletions(-)


- Log -----------------------------------------------------------------
commit cdfa9be9fd76f722c1a801b8ecd0f1960ae543bd
Author: Joshua Colp <jcolp at digium.com>
Date:   Mon Aug 1 11:51:26 2011 -0300

    Add support for sending and receiving of RTCP. This also adds support for getting statistic information from RTP originated sinks and sources.

diff --git a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index ff9d0d2..f3aae1f 100644
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@ -91,6 +91,8 @@ module V1
 	AsteriskSCF::Media::RTP::V1::PayloadMap mPayloadstoFormats;
 	bool mIPv6;
         bool mSRTP;
+        string mRemoteRtcpAddress;
+        int mRemoteRtcpPort;
     };
 
     class RtpStreamSinkStateItem extends RtpStateItem
@@ -104,6 +106,7 @@ module V1
     {
 	AsteriskSCF::Media::V1::StreamSinkSeq mSinks;
     };
+
 }; /* module V1 */
 
 }; /* module MediaRTPPJMedia */
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 88fca37..69c1fea 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -33,18 +33,108 @@
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media;
 using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
 using namespace AsteriskSCF::System::Component::V1;
 using namespace AsteriskSCF::Discovery;
 using namespace AsteriskSCF::PJMediaRTP;
 
 /**
+ * RTCP Information Interface implementation.
+ */
+class RTCPInformationImpl : public RTCP::V1::Information
+{
+public:
+    RTCPInformationImpl(pjmedia_rtcp_stat *general, pjmedia_rtcp_stream_stat *stream) :
+        mGeneralStatistics(general), mStreamStatistics(stream) { }
+
+    RTCP::V1::StatisticsPtr getStatistics(const Ice::Current&)
+    {
+	RTCP::V1::StatisticsPtr statistics = new RTCP::V1::Statistics();
+
+        statistics->roundTripDelay = new RTCP::V1::ExtendedDetails();
+        statistics->roundTripDelay->maximum = mGeneralStatistics->rtt.max;
+        statistics->roundTripDelay->minimum = mGeneralStatistics->rtt.min;
+        statistics->roundTripDelay->last = mGeneralStatistics->rtt.last;
+        statistics->roundTripDelay->mean = mGeneralStatistics->rtt.mean;
+
+        statistics->packets = mStreamStatistics->pkt;
+        statistics->discardedPackets = mStreamStatistics->discard;
+        statistics->lostPackets = mStreamStatistics->loss;
+        statistics->outOfOrderPackets = mStreamStatistics->reorder;
+        statistics->duplicatePackets = mStreamStatistics->dup;
+
+        statistics->loss = new RTCP::V1::ExtendedDetails();
+        statistics->loss->maximum = mStreamStatistics->loss_period.max;
+        statistics->loss->minimum = mStreamStatistics->loss_period.min;
+        statistics->loss->last = mStreamStatistics->loss_period.last;
+        statistics->loss->mean = mStreamStatistics->loss_period.mean;
+
+        statistics->jitter = new RTCP::V1::ExtendedDetails();
+        statistics->jitter->maximum = mStreamStatistics->jitter.max;
+        statistics->jitter->minimum = mStreamStatistics->jitter.min;
+        statistics->jitter->last = mStreamStatistics->jitter.last;
+        statistics->jitter->mean = mStreamStatistics->jitter.mean;
+
+        return statistics;
+    }
+
+    void addListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    {
+	boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mListeners.push_back(listener);
+    }
+
+    void removeListener(const RTCP::V1::InformationListenerPrx& listener, const Ice::Current&)
+    {
+	boost::unique_lock<boost::shared_mutex> lock(mLock);
+        mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+    }
+
+    /**
+     * Implementation specific function which returns a copy of the listeners.
+     */
+    std::vector<RTCP::V1::InformationListenerPrx> getListeners()
+    {
+	boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mListeners;
+    }
+
+private:
+    /**
+     * Lock to protect the listeners.
+     */
+    boost::shared_mutex mLock;
+
+    /**
+     * Listeners present.
+     */
+    std::vector<RTCP::V1::InformationListenerPrx> mListeners;
+
+    /**
+     * Structure where general RTCP information is.
+     */
+    pjmedia_rtcp_stat *mGeneralStatistics;
+
+    /**
+     * Structure where stream RTCP information is.
+     */
+    pjmedia_rtcp_stream_stat *mStreamStatistics;
+};
+
+/**
+ * Smart pointer for the above RTCPInformationImpl class.
+ */
+typedef IceUtil::Handle<RTCPInformationImpl> RTCPInformationImplPtr;
+
+/**
  * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
  */
 class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
@@ -99,6 +189,8 @@ public:
      */
     StreamSourceRTPImplPtr getSourceServant();
     StreamSinkRTPImplPtr getSinkServant();
+    PJMediaTransportPtr getTransport();
+
 
     void replicateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
             const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
@@ -108,6 +200,12 @@ public:
             const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
 
     void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& payloadMap);
+    void setRemoteRtcpDetails(const std::string&, Ice::Int);
+    pjmedia_rtcp_session* getRtcpSession();
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners();
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics();
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners();
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics();
 
     RTPSessionPrx activate(const string& id);
     RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId, const Ice::Identity& sinkId);
@@ -193,6 +291,26 @@ private:
      * A proxy to the state replicator where we are sending updates to.
      */
     AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+
+    /**
+     * RTCP session for this RTP session.
+     */
+    pjmedia_rtcp_session mRtcpSession;
+
+    /**
+     * A pointer to the RTCP session interface.
+     */
+    RTCP::V1::RTCPSessionPtr mRtcpSessionInterface;
+
+    /**
+     * RTCP information for Receiver Report.
+     */
+    RTCPInformationImplPtr mReceiverReport;
+
+    /**
+     * RTCP information for Sender Report.
+     */
+    RTCPInformationImplPtr mSenderReport;
 };
 
 /**
@@ -200,6 +318,42 @@ private:
  */
 typedef IceUtil::Handle<RTPSessionImpl> RTPSessionImplPtr;
 
+/**
+ * Implementation of the RTCPSession interface as defined in MediaRTCPIf.ice
+ */
+class RTCPSessionImpl : public RTCP::V1::RTCPSession
+{
+public:
+    /**
+     * Constructor for this implementation.
+     */
+    RTCPSessionImpl(const RTPSessionImplPtr& session) : mSession(session) { }
+
+    /**
+     * Method used to retrieve the port our RTCP session is listening on.
+     */
+    int getLocalPort(const Ice::Current&)
+    {
+        pjmedia_transport_info transportInfo;
+
+        pjmedia_transport_info_init(&transportInfo);
+        pjmedia_transport_get_info(mSession->getTransport()->getTransport(), &transportInfo);
+
+        return pj_sockaddr_get_port(&transportInfo.sock_info.rtcp_addr_name);
+    }
+
+    void setRemoteDetails(const std::string& address, Ice::Int port, const Ice::Current&)
+    {
+        mSession->setRemoteRtcpDetails(address, port);
+    }
+
+private:
+    /**
+     * Pointer to the RTP session.
+     */
+    RTPSessionImplPtr mSession;
+};
+
 //
 // Provides an adapter to the session implementation without having to expose the entire header file. This is a
 // step-wise decoupling of the session implementation from the other objects in this component. The whole
@@ -243,6 +397,31 @@ public:
         mServant->setRemoteDetails(host, port);
     }
 
+    pjmedia_rtcp_session* getRtcpSession()
+    {
+        return mServant->getRtcpSession();
+    }
+
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners()
+    {
+        return mServant->getReceiverReportListeners();
+    }
+
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics()
+    {
+        return mServant->getReceiverReportStatistics();
+    }
+
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners()
+    {
+        return mServant->getSenderReportListeners();
+    }
+
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics()
+    {
+        return mServant->getSenderReportStatistics();
+    }
+
 private:
     RTPSessionImplPtr mServant;
 };
@@ -301,6 +480,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
     mSessionStateItem->mSessionIdentity = mAdapter->getCommunicator()->stringToIdentity(mId);
     mSessionStateItem->mFormats = params->formats;
     mSessionStateItem->mIPv6 = params->ipv6;
+
+    pjmedia_rtcp_init(&mRtcpSession, NULL, 8000, 160, 0);
 }
 
 /**
@@ -340,6 +521,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
     {
         mTransport = SRTPTransport::create(mTransport, mEndpoint, configurationService);
     }
+
+    pjmedia_rtcp_init(&mRtcpSession, NULL, 8000, 160, 0);
 }
 
 /**
@@ -383,16 +566,6 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
 }
 
 /**
- * Implementation of the useRTCP method as defined in MediaRTPIf.ice
- */
-void RTPSessionImpl::useRTCP(bool, const Ice::Current&)
-{
-    //
-    // TODO.
-    //
-}
-
-/**
  * Implementation of the release method as defined in MediaRTPIf.ice
  */
 void RTPSessionImpl::release(const Ice::Current&)
@@ -404,7 +577,7 @@ void RTPSessionImpl::release(const Ice::Current&)
     mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
     mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
 
-    /* Since both the source and sink have a pointer back to the session we need to get rid of them,
+    /* Since the source and sink have a pointer back to the session we need to get rid of them,
      * which will in turn get rid of ourselves once we are removed from the ASM.
      */
     mStreamSource = 0;
@@ -450,6 +623,56 @@ void RTPSessionImpl::start(const string& suiteName, const string& key, bool enab
 }
 
 /**
+ * API call which returns the RTCP session used for this RTP session.
+ *
+ * @return A pointer to the RTCP session.
+ */
+pjmedia_rtcp_session* RTPSessionImpl::getRtcpSession()
+{
+    return &mRtcpSession;
+}
+
+/**
+ * API call which returns the listeners wanting statistics for receiving as they change.
+ *
+ * @return A copy of the listeners wanting the statistics.
+ */
+std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> RTPSessionImpl::getReceiverReportListeners()
+{
+    return mReceiverReport->getListeners();
+}
+
+/**
+ * API call which returns current statistics for receiving.
+ *
+ * @return The current receiving statistics.
+ */
+AsteriskSCF::Media::RTCP::V1::StatisticsPtr RTPSessionImpl::getReceiverReportStatistics()
+{
+    return mReceiverReport->getStatistics(Ice::Current());
+}
+
+/**
+ * API call which returns the listeners wanting statistics for sending as they change.
+ *
+ * @return A copy of the listeners wanting the statistics.
+ */
+std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> RTPSessionImpl::getSenderReportListeners()
+{
+    return mSenderReport->getListeners();
+}
+
+/**
+ * API call which returns current statistics for sending.
+ *
+ * @return The current sending statistics.
+ */
+AsteriskSCF::Media::RTCP::V1::StatisticsPtr RTPSessionImpl::getSenderReportStatistics()
+{
+    return mSenderReport->getStatistics(Ice::Current());
+}
+
+/**
  * API call which returns the formats the RTP session is expected to carry.
  *
  * @return A sequence of media formats.
@@ -468,6 +691,19 @@ void RTPSessionImpl::setRemoteDetails(const string& address, Ice::Int port)
 }
 
 /**
+ * API call which calls into RTPSourceImpl in order to setup transport.
+ */
+void RTPSessionImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+    mSessionStateItem->mRemoteRtcpAddress = address;
+    mSessionStateItem->mRemoteRtcpPort = port;
+
+    mStreamSource->setRemoteRtcpDetails(address, port);
+
+    replicateState(mSessionStateItem, 0, 0);
+}
+
+/**
  * API call which returns a media format based on payload.
  *
  * @return The media format corresponding to the payload.
@@ -511,6 +747,11 @@ StreamSinkRTPImplPtr RTPSessionImpl::getSinkServant()
     return mStreamSink;
 }
 
+PJMediaTransportPtr RTPSessionImpl::getTransport()
+{
+    return mTransport;
+}
+
 /**
  * API call which replicates state items.
  */
@@ -623,10 +864,17 @@ RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id, const Ice::Ident
     mSessionAdapter.reset(new SessionAdapterImpl(this));
     try
     {
-        mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId);
+        mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId, StreamSourceRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sourceId)),
+                                                StreamSinkRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sinkId)));
         mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mTransport, mId);
+        mRtcpSessionInterface = new RTCPSessionImpl(this);
+        mReceiverReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.rx);
+        mSenderReport = new RTCPInformationImpl(&mRtcpSession.stat, &mRtcpSession.stat.tx);
         mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
         mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
+        mAdapter->addFacet(mRtcpSessionInterface, id, RTCP::V1::SessionFacet);
+        mAdapter->addFacet(mReceiverReport, sourceId, RTCP::V1::Facet);
+        mAdapter->addFacet(mSenderReport, sinkId, RTCP::V1::Facet);
 
         if (mSessionStateItem)
         {
@@ -686,6 +934,7 @@ public:
     void update(const RtpSessionStateItemPtr& item)
     {
         mImpl->associatePayloadsImpl(item->mPayloadstoFormats);
+        mImpl->setRemoteRtcpDetails(item->mRemoteRtcpAddress, item->mRemoteRtcpPort);
     }
 
     void update(const RtpStreamSinkStateItemPtr& item)
diff --git a/src/RTPSession.h b/src/RTPSession.h
index b6f6d54..a4f8072 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -19,6 +19,7 @@
 #include <AsteriskSCF/Discovery/SmartProxy.h>
 #include <boost/shared_ptr.hpp>
 
+
 namespace AsteriskSCF
 {
 namespace PJMediaRTP
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 7d3d07b..8184c1a 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -154,6 +154,9 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
             /* TODO: Transmission failed... what to do? */
             continue;
         }
+
+	// Update RTCP information
+	pjmedia_rtcp_tx_rtp(mImpl->mSessionAdapter->getRtcpSession(), static_cast<unsigned int>((*frame)->payload.size()));
     }
 }
 
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index ed3ad97..4ae46f9 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -22,16 +22,19 @@
 
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
+#include <IceUtil/Timer.h>
 
 #include <boost/thread.hpp>
 
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
 #include <AsteriskSCF/logger.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Media;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
@@ -44,6 +47,63 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 }
 
 /**
+ * TimerTask implementation which sends RTCP at a defined interval.
+ */
+class RtcpTransmission : public IceUtil::TimerTask
+{
+public:
+    RtcpTransmission(const SessionAdapterPtr& sessionAdapter, const StreamSinkRTPPrx& sink, const PJMediaTransportPtr& transport) :
+        mSessionAdapter(sessionAdapter), mSink(sink), mTransport(transport) { }
+
+    void runTimerTask()
+    {
+        void *packet;
+        int packet_size;
+
+        pjmedia_rtcp_build_rtcp(mSessionAdapter->getRtcpSession(), &packet, &packet_size);
+        pjmedia_transport_send_rtcp(mTransport->getTransport(), packet, packet_size);
+
+	std::vector<RTCP::V1::InformationListenerPrx> listeners = mSessionAdapter->getSenderReportListeners();
+
+        // If no listeners exist don't bother getting the statistics
+        if (listeners.empty())
+        {
+            return;
+        }
+
+	RTCP::V1::StatisticsPtr statistics = mSessionAdapter->getSenderReportStatistics();
+
+        for (std::vector<RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+             listener != listeners.end();
+             ++listener)
+        {
+            (*listener)->sinkStatisticsUpdated(mSink, statistics);
+        }
+    }
+
+private:
+    /**
+     * A pointer to the session adapter.
+     */
+    SessionAdapterPtr mSessionAdapter;
+
+    /**
+     * Proxy to the sink.
+     */
+    StreamSinkRTPPrx mSink;
+
+    /**
+     * Pointer to the transport used for communication.
+     */
+    PJMediaTransportPtr mTransport;
+};
+
+/**
+ * Smart pointer for the above RtcpTransmission class.
+ */
+typedef IceUtil::Handle<RtcpTransmission> RtcpTransmissionPtr;
+
+/**
  * Private implementation details for the StreamSourceRTPImpl class.
  */
 class StreamSourceRTPImplPriv
@@ -53,8 +113,16 @@ public:
      * Constructor for our StreamSourceRTPImplPriv class.
      */
     StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter, 
-        const PJMediaTransportPtr& transport,
-        const string& parentSessionId);
+                            const PJMediaTransportPtr& transport,
+                            const string& parentSessionId,
+                            const StreamSourceRTPPrx& source,
+                            const StreamSinkRTPPrx& sink
+        );
+
+    /**
+     * Destructor for our StreamSourceRTPImplPriv class.
+     */
+    ~StreamSourceRTPImplPriv();
 
     /**
      * A structure containing incoming pjmedia session data.
@@ -78,6 +146,21 @@ public:
 
     string mSessionId;
     /**
+     * Timer used for sending RTCP reports.
+     */
+    IceUtil::TimerPtr mTimer;
+
+    /**
+     * Source of media, conveyed to RTCP listeners.
+     */
+    StreamSourceRTPPrx mSource;
+
+    /**
+     * Sink for media, conveyed to RTCP listeners.
+     */
+    StreamSinkRTPPrx mSink;
+
+    /**
      * Lock that protects information contained.
      */
     boost::shared_mutex mLock;
@@ -87,23 +170,41 @@ public:
  * Constructor for the StreamSourceRTPImplPriv class.
  */
 StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session, 
-    const PJMediaTransportPtr& transport, 
-    const string& sessionId) :
+                                                 const PJMediaTransportPtr& transport, 
+                                                 const string& sessionId,
+                                                 const StreamSourceRTPPrx& source,
+                                                 const StreamSinkRTPPrx& sink) :
     mSessionAdapter(session), mTransport(transport), 
     mSourceStateItem(new RtpStreamSourceStateItem), 
-    mSessionId(sessionId)
+    mSessionId(sessionId),
+    mSource(source),
+    mSink(sink)
 {
     pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
     mSourceStateItem->mSessionId = sessionId;
     mSourceStateItem->key = IceUtil::generateUUID();
-};
+}
+
+/**
+ * Destructor for the StreamSourceRTPImplPriv class.
+ */
+StreamSourceRTPImplPriv::~StreamSourceRTPImplPriv()
+{
+    // Destroy the RTCP transmission timer if it exists
+    if (mTimer)
+    {
+        mTimer->destroy();
+    }
+}
 
 /**
  * Constructor for the StreamSourceRTPImpl class.
  */
 StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
-        const PJMediaTransportPtr& transport, const string& sessionId) :
-    mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId))
+                                         const PJMediaTransportPtr& transport, const string& sessionId,
+                                         const StreamSourceRTPPrx& source,
+                                         const StreamSinkRTPPrx& sink) :
+    mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
 {
 }
 
@@ -233,7 +334,27 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
     }
 
     // Update RTP stack information before writing to sinks, it's fine to do it
-    pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+    pjmedia_rtp_status rtpStatus;
+    pjmedia_rtp_session_update2(&source->mImpl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
+
+    // Update RTCP information
+    pjmedia_rtcp_rx_rtp2(source->mImpl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
+                         payload_size, ((rtpStatus.status.value && rtpStatus.status.flag.bad) || !payload_size)
+                         ? PJ_TRUE : PJ_FALSE);
+
+    // If the SSRC has changed contact any appropriate listeners
+    if (rtpStatus.status.value && rtpStatus.status.flag.badssrc)
+    {
+        std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
+            source->mImpl->mSessionAdapter->getReceiverReportListeners();
+
+        for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+             listener != listeners.end();
+             ++listener)
+        {
+            (*listener)->sourceSsrcChanged(source->mImpl->mSource, source->mImpl->mIncomingSession.peer_ssrc);
+        }
+    }
 
     if (source->mImpl->mSourceStateItem->mSinks.empty())
     {
@@ -304,6 +425,40 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
 }
 
 /**
+ * Function which is called when RTCP is received.
+ */
+static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
+{
+    StreamSourceRTPImpl* source = static_cast<StreamSourceRTPImpl*>(userdata);
+
+    /* Ensure that no errors occurred when reading this packet in */
+    if (size < 0)
+    {
+        lg(Error) << "We attempted to read data from an RTCP session but failed.";
+        return;
+    }
+
+    pjmedia_rtcp_rx_rtcp(source->mImpl->mSessionAdapter->getRtcpSession(), packet, size);
+
+    std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners = 
+        source->mImpl->mSessionAdapter->getReceiverReportListeners();
+
+    if (listeners.empty())
+    {
+        return;
+    }
+
+    AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = source->mImpl->mSessionAdapter->getReceiverReportStatistics();
+
+    for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
+         listener != listeners.end();
+         ++listener)
+    {
+        (*listener)->sourceStatisticsUpdated(source->mImpl->mSource, statistics);
+    }
+}
+
+/**
  * API call which sets up our pjmedia transport and allows media to be sent and received.
  */
 void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
@@ -339,7 +494,8 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
     pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
 
     /* All ready... actually do it! */
-    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &addr, NULL, pj_sockaddr_get_len(&addr), &receiveRTP, NULL);
+    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &addr, &transportInfo.src_rtcp_name,
+                                      pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
 
     if (status != PJ_SUCCESS)
     {
@@ -348,6 +504,62 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
 }
 
 /**
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
+ */
+void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+    pj_sockaddr addr;
+
+    /* This feels so dirty but convert from our std::string to a pj_str, since their API requires it. */
+    pj_str_t tmpAddress;
+    pj_strset(&tmpAddress, (char*)address.c_str(), address.size());
+
+    /* Now for the next trick - convert into a pj_sockaddr so we can pass it to pjmedia_transport_attach */
+    pj_status_t status = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &tmpAddress, &addr);
+
+    if (status != PJ_SUCCESS)
+    {
+        throw InvalidAddress();
+    }
+
+    // Confirm that the address family of the address matches that of this RTP session
+    pjmedia_transport_info transportInfo;
+
+    pjmedia_transport_info_init(&transportInfo);
+    pjmedia_transport_get_info(mImpl->mTransport->getTransport(), &transportInfo);
+
+    if (transportInfo.sock_info.rtcp_addr_name.addr.sa_family != addr.addr.sa_family)
+    {
+        throw InvalidAddress();
+    }
+
+    pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
+
+    /* In case we were already attached go ahead and detach */
+    pjmedia_transport_detach(mImpl->mTransport->getTransport(), this);
+
+    /* All ready... actually do it! */
+    status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), this, &transportInfo.src_rtp_name, &addr,
+                                      pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
+
+    if (status != PJ_SUCCESS)
+    {
+        throw InvalidAddress();
+    }
+
+    // If RTCP is not already being sent start sending it
+    if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
+    {
+        RtcpTransmissionPtr transmission;
+
+        if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport)))
+        {
+            mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
+        }
+    }
+}
+
+/**
  * API call which returns a pointer to the source state item.
  */
 RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
diff --git a/src/RTPSource.h b/src/RTPSource.h
index f100cd4..aeeff2c 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -25,8 +25,10 @@ class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
 {
 public:
     StreamSourceRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter, 
-        const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
-        const std::string& parentSessionId);
+                        const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
+                        const std::string& parentSessionId,
+                        const AsteriskSCF::Media::RTP::V1::StreamSourceRTPPrx& source,
+                        const AsteriskSCF::Media::RTP::V1::StreamSinkRTPPrx& sink);
 
     void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
     void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
@@ -39,6 +41,8 @@ public:
 
     void setRemoteDetails(const std::string& address, Ice::Int port);
     void setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq&);
+    void setRemoteRtcpDetails(const std::string&, Ice::Int);
+
     AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
 
     void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 4e1ab03..9ceb5a5 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -114,7 +114,7 @@ public:
 		{
 		    localitem = i->second;
 		}
-
+		
                 //
                 // TODO: This appears to happen in testing on occasion. Should verify if this should be
                 // expected.
diff --git a/src/SessionAdapter.h b/src/SessionAdapter.h
index d164bca..de017b2 100644
--- a/src/SessionAdapter.h
+++ b/src/SessionAdapter.h
@@ -20,10 +20,13 @@
 #include <boost/shared_ptr.hpp>
 #include "RtpStateReplicationIf.h"
 
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
+
 //
 // XXX forward declaration that will be obsoleted soon.
 // 
 struct pjmedia_transport;
+struct pjmedia_rtcp_session;
 
 namespace AsteriskSCF
 {
@@ -53,6 +56,11 @@ public:
     virtual int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) = 0;
     virtual AsteriskSCF::Media::V1::FormatSeq getFormats() = 0;
     virtual void setRemoteDetails(const std::string& host, int port) = 0;
+    virtual pjmedia_rtcp_session* getRtcpSession() = 0;
+    virtual std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getReceiverReportListeners() = 0;
+    virtual AsteriskSCF::Media::RTCP::V1::StatisticsPtr getReceiverReportStatistics() = 0;
+    virtual std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> getSenderReportListeners() = 0;
+    virtual AsteriskSCF::Media::RTCP::V1::StatisticsPtr getSenderReportStatistics() = 0;
 };
 
 } /* End of namespace PJMediaRTP */
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 212c58e..ca15d65 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -21,6 +21,8 @@
 #endif
 #define BOOST_TEST_NO_MAIN
 
+#include <pjmedia.h>
+
 #include <boost/test/unit_test.hpp>
 #include <boost/test/debug.hpp>
 #include <boost/thread/thread.hpp>
@@ -33,11 +35,13 @@
 #include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTCPIf.h>
 
 #include "RtpStateReplicationIf.h"
 
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
+using namespace AsteriskSCF::Media;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
@@ -77,6 +81,33 @@ public:
 
 typedef IceUtil::Handle<TestRtpReplicatorListener> TestRtpReplicatorListenerPtr;
 
+class TestInformationListener : public RTCP::V1::InformationListener
+{
+public:
+    TestInformationListener() : mSsrcChanged(false) { }
+
+    void sourceStatisticsUpdated(const StreamSourceRTPPrx&, const RTCP::V1::StatisticsPtr& statistics, const Ice::Current&)
+    {
+        mSourceStatistics = statistics;
+    }
+
+    void sinkStatisticsUpdated(const StreamSinkRTPPrx&, const RTCP::V1::StatisticsPtr& statistics, const Ice::Current&)
+    {
+        mSinkStatistics = statistics;
+    }
+
+    void sourceSsrcChanged(const StreamSourceRTPPrx&, Ice::Int, const Ice::Current&)
+    {
+        mSsrcChanged = true;
+    }
+
+    RTCP::V1::StatisticsPtr mSourceStatistics;
+    RTCP::V1::StatisticsPtr mSinkStatistics;
+    bool mSsrcChanged;
+};
+
+typedef IceUtil::Handle<TestInformationListener> TestInformationListenerPtr;
+
 /**
  * It seems odd that boost doesn't provide an easy way to access the GLOBAL_FIXTURE members.
  * But it doesn't seem to, so I'm sharing global setup stuff here.
@@ -110,6 +141,16 @@ public:
     RtpStateReplicatorListenerPrx mListenerProxy;
 
     /**
+     * Instance of our RTCP information listener.
+     */
+    TestInformationListenerPtr mInformationListener;
+
+    /**
+     * A proxy to our information listener.
+     */
+    RTCP::V1::InformationListenerPrx mInformationListenerProxy;
+
+    /**
      * A proxy to the RTP session that we requested.
      */
     RTPSessionPrx session;
@@ -275,6 +316,11 @@ struct GlobalIceFixture
 
 	    Testbed.mListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(Testbed.adapter->addWithUUID(Testbed.mListener));
 
+            Testbed.mInformationListener = new TestInformationListener();
+
+            Testbed.mInformationListenerProxy = RTCP::V1::InformationListenerPrx::uncheckedCast(Testbed.adapter->addWithUUID(
+                                                                                                    Testbed.mInformationListener));
+
             Testbed.adapter->activate();
 
             Testbed.locator = ServiceLocatorPrx::checkedCast(Testbed.communicator->stringToProxy("LocatorService:tcp -p 4411"));
@@ -476,6 +522,9 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSession)
     StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
     BOOST_CHECK(Testbed.mListener->mSession->mSourceIdentity == source->ice_getIdentity());
     BOOST_CHECK(Testbed.mListener->mSession->mPort == source->getLocalPort());
+
+    BOOST_CHECK(!Testbed.mListener->mSession->mRemoteRtcpAddress.size());
+    BOOST_CHECK(!Testbed.mListener->mSession->mRemoteRtcpPort);
 }
 
 /**
@@ -569,6 +618,60 @@ BOOST_AUTO_TEST_CASE(VerifyLocalAddressonSources)
 }
 
 /**
+ * Check that we receive a local port that RTCP remote RTCP is to be sent to
+ */
+BOOST_AUTO_TEST_CASE(VerifyLocalRTCPPortonSession)
+{
+    int port = 0;
+
+    try
+    {
+        RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+                                                                                 RTCP::V1::SessionFacet);
+        port = session->getLocalPort();
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+
+    BOOST_CHECK(port);
+}
+
+/**
+ * Check that we can set remote details for RTCP and that they get replicated
+ */
+BOOST_AUTO_TEST_CASE(VerifyReplicatedRTCPRemoteDetails)
+{
+    boost::mutex::scoped_lock lock(Testbed.mLock);
+
+#ifdef IPV6_TEST
+    std::string address = "::1";
+#else
+    std::string address = "127.0.0.1";
+#endif
+
+    try
+    {
+	RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+	    RTCP::V1::SessionFacet);
+
+        session->setRemoteDetails(address, 10001);
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+
+    Testbed.mCondition.wait(lock);
+
+    BOOST_CHECK(Testbed.mListener->mSession->mRemoteRtcpAddress == address);
+    BOOST_CHECK(Testbed.mListener->mSession->mRemoteRtcpPort == 10001);
+}
+
+/**
  * Check that the RTP session has at least one sink
  */
 BOOST_AUTO_TEST_CASE(CheckForSink)
@@ -683,7 +786,7 @@ BOOST_AUTO_TEST_CASE(TransmitFrametoEmptySink)
 /**
  * Check that we can set the remote address information properly
  */
-BOOST_AUTO_TEST_CASE(ConfirmRemoteAddressSetting)
+BOOST_AUTO_TEST_CASE(ConfirmRemoteRTPAddressSetting)
 {
     bool set = false;
 
@@ -794,9 +897,102 @@ BOOST_AUTO_TEST_CASE(PushPayloadMappings)
 }
 
 /**
+ * Add RTCP information listener to source
+ */
+BOOST_AUTO_TEST_CASE(AddRTCPListenerToSource)
+{
+    bool added = false;
+
+    try
+    {
+        StreamSourceSeq sources = Testbed.session->getSources();
+        StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+        RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(source,
+                                                                                     RTCP::V1::Facet);
+        information->addListener(Testbed.mInformationListenerProxy);
+        added = true;
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+    catch (...)
+    {
+    }
+
+    BOOST_CHECK(added);
+}
+
+/**
+ * Add RTCP information listener to sink
+ */
+BOOST_AUTO_TEST_CASE(AddRTCPListenerToSink)
+{
+    bool added = false;
+
+    try
+    {
+        StreamSinkSeq sinks = Testbed.session->getSinks();
+        StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(sinks.front());
+        RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(sink,
+                                                                                     RTCP::V1::Facet);
+        information->addListener(Testbed.mInformationListenerProxy);
+        added = true;
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+    catch (...)
+    {
+    }
+
+    BOOST_CHECK(added);
+}
+
+/**
+ * Setup RTCP loopback from the session, to, well, the session
+ */
+BOOST_AUTO_TEST_CASE(SetupRTCPLoopback)
+{
+    bool looped = false;
+
+    try
+    {
+        StreamSourceSeq sources = Testbed.session->getSources();
+        StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+        string address = source->getLocalAddress();
+
+        RTCP::V1::RTCPSessionPrx session = RTCP::V1::RTCPSessionPrx::checkedCast(Testbed.session,
+                                                                                 RTCP::V1::SessionFacet);
+        int port = session->getLocalPort();
+
+        boost::mutex::scoped_lock lock(Testbed.mLock);
+
+        session->setRemoteDetails(address, port);
+
+        looped = true;
+
+        Testbed.mCondition.wait(lock);
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+    catch (...)
+    {
+    }
+
+    BOOST_CHECK(looped);
+}
+
+/**
  * Setup RTP loopback from the session to, well, the session
  */
-BOOST_AUTO_TEST_CASE(SetupLoopback)
+BOOST_AUTO_TEST_CASE(SetupRTPLoopback)
 {
     bool looped = false;
 
@@ -1051,6 +1247,83 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
 }
 
 /**
+ * Wait for and retrieve RTCP statistics information from the source.
+ */
+BOOST_AUTO_TEST_CASE(WaitForAndRetrieveSourceRTCPStatistics)
+{
+    RTCP::V1::StatisticsPtr statistics;
+
+    try
+    {
+        StreamSourceSeq sources = Testbed.session->getSources();
+        StreamSourceRTPPrx source = StreamSourceRTPPrx::uncheckedCast(sources.front());
+
+        RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(source, RTCP::V1::Facet);
+
+        // Give enough time for a few RTCP packets to be sent
+        IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL * 4));
+
+        // Now retrieve the statistics and peek at them
+        statistics = information->getStatistics();
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+    catch (...)
+    {
+    }
+
+    BOOST_CHECK(statistics);
+    BOOST_CHECK(statistics->roundTripDelay->last);
+    BOOST_CHECK(statistics->packets);
+}
+
+/**
+ * Retrieve RTCP statistics information from the sink.
+ */
+BOOST_AUTO_TEST_CASE(RetrieveSinkRTCPStatistics)
+{
+    RTCP::V1::StatisticsPtr statistics;
+
+    try
+    {
+        StreamSinkSeq sinks = Testbed.session->getSinks();
+        StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(sinks.front());
+
+        RTCP::V1::InformationPrx information = RTCP::V1::InformationPrx::checkedCast(sink, RTCP::V1::Facet);
+        statistics = information->getStatistics();
+    }
+    catch (const Ice::Exception &e)
+    {
+        BOOST_TEST_MESSAGE(e.ice_name());
+        BOOST_TEST_MESSAGE(e.what());
+    }
+    catch (...)
+    {
+    }
+
+    BOOST_CHECK(statistics);
+    BOOST_CHECK(statistics->roundTripDelay->last);
+    BOOST_CHECK(statistics->packets);
+}
+
+/**
+ * Confirm that our information listener was called and that statistics information is proper.
+ */
+BOOST_AUTO_TEST_CASE(ConfirmInformationListenerCalled)
+{
+    BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics);
+    BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics->roundTripDelay->last);
+    BOOST_CHECK(Testbed.mInformationListener->mSourceStatistics->packets);
+
+    BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics);
+    BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics->roundTripDelay->last);
+    BOOST_CHECK(Testbed.mInformationListener->mSinkStatistics->packets);
+}
+
+/**
  * Attempt to set an IPv4 address on an IPv6 only sink OR try to set an IPv6 address
  * on an IPv4 only sink.
  */

-----------------------------------------------------------------------


-- 
asterisk-scf/release/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list