[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "alternate_source_cleanup" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon Jul 9 15:10:32 CDT 2012
branch "alternate_source_cleanup" has been updated
via f0e6255ca9fe653b034055d82df3cd48319ad3dc (commit)
via 275d7d2461aee6c129f4f4d5e5bfa2e86b93166d (commit)
from d866db0ec2b66194d3f646b7c330959b21b8b3b4 (commit)
Summary of changes:
src/CMakeLists.txt | 2 +
src/PJMEDIATransport.cpp | 16 --
src/PJMEDIATransport.h | 4 -
src/RTPSession.cpp | 6 -
src/RTPSource.cpp | 435 ++++++++++++++--------------------------------
src/RTPSource.h | 6 +-
src/TransportMap.cpp | 266 ++++++++++++++++++++++++++++
src/TransportMap.h | 90 ++++++++++
test/TestRTPpjmedia.cpp | 5 +
9 files changed, 497 insertions(+), 333 deletions(-)
mode change 100644 => 100755 config/RTPConfigurator.py
mode change 100644 => 100755 src/Configuration.h
create mode 100755 src/TransportMap.cpp
create mode 100755 src/TransportMap.h
- Log -----------------------------------------------------------------
commit f0e6255ca9fe653b034055d82df3cd48319ad3dc
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 17:30:47 2012 -0230
Fixed several silly mistakes with rtp/rtcp setup.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 5bacf4b..c4900d9 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -464,7 +464,7 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
*/
void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
{
- TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, false);
}
@@ -473,7 +473,15 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
*/
void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
{
- TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, true);
+ if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
+ {
+ RtcpTransmissionPtr transmission;
+ if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport, mImpl->mThreadDescriptor)))
+ {
+ mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
+ }
+ }
}
/**
@@ -683,6 +691,7 @@ bool StreamSourceRTPImpl::receiveRTP(void *userdata, void *packet, size_t size)
lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
}
}
+ return true;
}
/**
@@ -714,4 +723,5 @@ bool StreamSourceRTPImpl::receiveRTCP(void *userdata, void *packet, size_t size)
{
(*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), mImpl->mSource, statistics);
}
+ return true;
}
diff --git a/src/TransportMap.cpp b/src/TransportMap.cpp
index 923e344..07d5045 100755
--- a/src/TransportMap.cpp
+++ b/src/TransportMap.cpp
@@ -61,7 +61,7 @@ bool isCompatible(const pj_sockaddr& addr, const pjmedia_transport_info& info)
}
-void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int port)
+void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int port, bool rtcp)
{
assert(r.transport);
assert(r.source);
@@ -85,33 +85,37 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
//
// Lazy initialization of port subtables.
//
- if (mTable[tblIndex].size() < arrayIndex)
+ if (mTable[tblIndex].size() < (arrayIndex + 1))
{
- mTable[tblIndex].resize(arrayIndex);
+ mTable[tblIndex].resize(arrayIndex + 1);
}
MapRecord oldRecord = mTable[tblIndex][arrayIndex];
- if (oldRecord.transport)
+ if (oldRecord.transport != 0)
{
- if (oldRecord.transport == r.transport && oldRecord.source == r.source && port == address->port())
+ lg(Debug) << "Updating existing transport entry";
+ AddressPtr oldAddr = oldRecord.transport->localAddress();
+ if (oldAddr)
{
- //
- // We do not have to do anything here because doing what
- // comes next will not change anything. This can happen if
- // setRtcpRemoteDetails is called in addition to
- // setRemoteDetails.
- //
- return;
+ pjmedia_transport* t = oldRecord.transport->getTransport();
+ pjmedia_transport_detach(t, reinterpret_cast<void*>(oldAddr->port()));
}
- lg(Debug) << "Overwriting existing transport entry and detaching transport.";
- pjmedia_transport* t = oldRecord.transport->getTransport();
- pjmedia_transport_detach(t, reinterpret_cast<void*>(port));
}
mTable[tblIndex][arrayIndex] = r;
- pj_status_t result = pjmedia_transport_attach(r.transport->getTransport(),
- reinterpret_cast<void*>(port), &addr, &info.src_rtcp_name, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
- &TransportMap::receiveRTCP);
+ pj_status_t result;
+ if (rtcp)
+ {
+ result = pjmedia_transport_attach(r.transport->getTransport(),
+ reinterpret_cast<void*>(address->port()), &info.src_rtp_name, &addr, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
+ &TransportMap::receiveRTCP);
+ }
+ else
+ {
+ result = pjmedia_transport_attach(r.transport->getTransport(),
+ reinterpret_cast<void*>(address->port()), &addr, &info.src_rtcp_name, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
+ &TransportMap::receiveRTCP);
+ }
if (result != PJ_SUCCESS)
{
//
@@ -123,8 +127,11 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
mTable[tblIndex][arrayIndex] = MapRecord();
throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
}
-
- incEntries();
+
+ if (oldRecord.transport == 0)
+ {
+ incEntries();
+ }
}
void TransportMap::removeTransport(const MapRecord& r)
@@ -134,13 +141,13 @@ void TransportMap::removeTransport(const MapRecord& r)
assert(address);
unsigned tblIndex = address->port() / 1000;
unsigned arrayIndex = address->port() % 1000;
- if (mTable[tblIndex].size() < arrayIndex || !mTable[tblIndex][arrayIndex].transport)
+ if (mTable[tblIndex].size() < (arrayIndex + 1) || mTable[tblIndex][arrayIndex].transport == 0)
{
lg(Debug) << "Attempting to remove a non-existant transport";
return;
}
MapRecord toRemove = mTable[tblIndex][arrayIndex];
- if (toRemove.transport)
+ if (toRemove.transport != 0)
{
pjmedia_transport* t = toRemove.transport->getTransport();
if (t)
@@ -164,8 +171,9 @@ TransportMap& TransportMap::instance()
TransportMap TransportMap::mInstance;
TransportMap::TransportMap()
+ : mTable(65)
#ifndef _NDEBUG
- : mCounter(0)
+ , mCounter(0)
#endif
{
}
@@ -196,7 +204,7 @@ MapRecord TransportMap::get(const Ice::Int port)
// Bounds check
//
assert(tblIndex >= 0 && tblIndex <= mTable.size());
- if (mTable[tblIndex].empty() || mTable[tblIndex].size() < arrayIndex)
+ if (mTable[tblIndex].size() < (arrayIndex + 1))
{
throw TransportNotFound();
}
@@ -218,7 +226,7 @@ void TransportMap::receiveRTP(void* userdata, void* packet, pj_ssize_t size)
return;
}
- int portKey = reinterpret_cast<int>(userdata);
+ long portKey = reinterpret_cast<long>(userdata);
try
{
MapRecord transportRec = TransportMap::instance().get(portKey);
@@ -242,7 +250,7 @@ void TransportMap::receiveRTCP(void* userdata, void* packet, pj_ssize_t size)
return;
}
- int portKey = reinterpret_cast<int>(userdata);
+ long portKey = reinterpret_cast<long>(userdata);
try
{
MapRecord transportRec = TransportMap::instance().get(portKey);
diff --git a/src/TransportMap.h b/src/TransportMap.h
index dd5b9b3..bcbf5a7 100755
--- a/src/TransportMap.h
+++ b/src/TransportMap.h
@@ -19,7 +19,6 @@
#include "PJMEDIATransport.h"
#include "RTPSource.h"
#include <vector>
-#include <array>
#include <boost/thread.hpp>
#include <pjlib.h>
@@ -48,7 +47,7 @@ class TransportMap
{
public:
- void addTransport(const MapRecord& transport, const std::string& address, Ice::Int port);
+ void addTransport(const MapRecord& transport, const std::string& address, Ice::Int port, bool rtcp);
void removeTransport(const MapRecord& transport);
static TransportMap& instance();
@@ -61,7 +60,7 @@ private:
// that an RTP component that loads up will likely load up again!
//
typedef std::vector<MapRecord> TransportArray;
- typedef std::array<TransportArray, 65> TransportTable;
+ typedef std::vector<TransportArray> TransportTable;
boost::shared_mutex mLock;
TransportTable mTable;
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 6419848..5fe90ae 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -412,6 +412,11 @@ BOOST_AUTO_TEST_CASE(RTPSessionRetry)
BOOST_CHECK_EQUAL(expected, actual);
expected->release();
}
+ catch (const std::exception& ex)
+ {
+ expected->release();
+ BOOST_FAIL(ex.what());
+ }
catch(...)
{
expected->release();
commit 275d7d2461aee6c129f4f4d5e5bfa2e86b93166d
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 15:52:06 2012 -0230
Add a TransportMap helper class to help cleanup the RTPSession shutdown code.
diff --git a/config/RTPConfigurator.py b/config/RTPConfigurator.py
old mode 100644
new mode 100755
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 06d14a3..265a3be 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -44,6 +44,8 @@ astscf_component_add_files(MediaRTPPJMEDIA SRTPConfiguration.cpp)
astscf_component_add_files(MediaRTPPJMEDIA SRTPConfiguration.h)
astscf_component_add_files(MediaRTPPJMEDIA SRTPTransport.cpp)
astscf_component_add_files(MediaRTPPJMEDIA SRTPTransport.h)
+astscf_component_add_files(MediaRTPPJMEDIA TransportMap.cpp)
+astscf_component_add_files(MediaRTPPJMEDIA TransportMap.h)
astscf_component_add_slices(MediaRTPPJMEDIA PROJECT AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice)
astscf_component_add_slices(MediaRTPPJMEDIA PROJECT AsteriskSCF/Configuration/MediaRTPPJMEDIA/RTPConfigurationIf.ice)
astscf_component_add_boost_libraries(MediaRTPPJMEDIA core thread)
diff --git a/src/Configuration.h b/src/Configuration.h
old mode 100644
new mode 100755
diff --git a/src/PJMEDIATransport.cpp b/src/PJMEDIATransport.cpp
index c0047b4..2546b8b 100644
--- a/src/PJMEDIATransport.cpp
+++ b/src/PJMEDIATransport.cpp
@@ -65,22 +65,6 @@ AddressPtr PJMEDIATransport::remoteAddress()
return fromInfo(info);
}
-void PJMEDIATransport::attachLabel(const std::string& label)
-{
- boost::shared_ptr<char> data(new char[label.size() + 1]);
- strcpy(data.get(), label.c_str());
- if (mLabelData)
- {
- pjmedia_transport_detach(mTransport, mLabelData.get());
- }
- mLabelData = data;
-}
-
-boost::shared_ptr<char> PJMEDIATransport::getLabelData()
-{
- return mLabelData;
-}
-
AddressPtr PJMEDIATransport::getLocalAddressImpl()
{
pjmedia_transport_info info;
diff --git a/src/PJMEDIATransport.h b/src/PJMEDIATransport.h
index f4754f5..ca4889f 100644
--- a/src/PJMEDIATransport.h
+++ b/src/PJMEDIATransport.h
@@ -54,13 +54,9 @@ public:
* down the objects with lots of unnecessary code.
**/
virtual void addFacets(const Ice::ObjectAdapterPtr&, const Ice::Identity&) {}
-
- void attachLabel(const std::string& label);
- boost::shared_ptr<char> getLabelData();
protected:
pjmedia_transport* mTransport;
- boost::shared_ptr<char> mLabelData;
PJMEDIATransport(pjmedia_transport* t);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 2221b5e..e0c26cc 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -780,8 +780,6 @@ void RTPSessionImpl::release(const Ice::Current&)
{
// Remove everything from the state replicator if present
removeState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
-
-
destroy();
}
@@ -1209,10 +1207,6 @@ RTPSessionPrx RTPSessionImpl::activate(
}
RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
mTransport->addFacets(mAdapter, id);
- //
- // The source is an active element and must be explicitly "turned on"
- //
- mStreamSource->activate();
return result;
}
catch (...)
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index a53022b..5bacf4b 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -44,6 +44,8 @@
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include "TransportMap.h"
+
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Media;
@@ -64,7 +66,6 @@ using namespace AsteriskSCF::SessionCommunications::V1;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-
}
/**
@@ -84,7 +85,6 @@ public:
*/
typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
-
/**
* TimerTask implementation which sends RTCP at a defined interval.
*/
@@ -263,49 +263,6 @@ public:
typedef boost::shared_ptr<StreamSourceRTPImplPriv> StreamSourceRTPImplPrivPtr;
/**
- * A helper class to help keep track of classes that are associated with a
- * media session without having memory management headaches of associating
- * pointers to live objects with pjmedia transports.
- */
-class StreamTable
-{
-public:
- void add(const std::string& lbl, const StreamSourceRTPImplPrivPtr& info)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- InfoMap::const_iterator iter = mMap.find(lbl);
- if (iter == mMap.end())
- {
- mMap[lbl] = info;
- }
- }
-
- void remove(const std::string& lbl)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mMap.erase(lbl);
- }
-
- StreamSourceRTPImplPrivPtr get(const std::string& lbl)
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- InfoMap::const_iterator iter = mMap.find(lbl);
- if (iter != mMap.end())
- {
- return iter->second;
- }
- return StreamSourceRTPImplPrivPtr();
- }
-public:
- boost::shared_mutex mLock;
-
- typedef std::map<std::string, StreamSourceRTPImplPrivPtr> InfoMap;
- InfoMap mMap;
-};
-
-StreamTable streamTable;
-
-/**
* Constructor for the StreamSourceRTPImplPriv class.
*/
StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
@@ -344,8 +301,7 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
const PJMEDIATransportPtr& transport, const string& sessionId,
const StreamSourceRTPPrx& source,
const StreamSinkRTPPrx& sink) :
- mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink)),
- mLabelBuffer(0)
+ mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
{
}
@@ -502,29 +458,106 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
return 0;
}
+
/**
- * Function which is called when RTP media is received.
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
*/
-static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
+void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
{
- /* Ensure that no errors occurred when reading this packet in */
- if (size < 0)
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+}
+
+
+/**
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
+ */
+void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+}
+
+/**
+ * API call which returns a pointer to the source state item.
+ */
+RTPStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ return mImpl->mSourceStateItem;
+}
+
+RTPTelephonyEventSourceStateItemPtr StreamSourceRTPImpl::getTelephonyEventSourceStateItem()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed && mImpl->mTelephonyEventSource)
{
- lg(Error) << "We attempted to read data from an RTP session but failed.";
- return;
+ return mImpl->mTelephonyEventSource->getStateItem();
}
+ return 0;
+}
- const char* infoLabel = static_cast<const char*>(userdata);
- if (!infoLabel)
+void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (!mImpl->mDestroyed)
{
- lg(Error) << "Attempted to process an RTP packet with no associated session information.";
- return;
+ mImpl->mSourceStateItem->sinks = sinks;
}
- StreamSourceRTPImplPrivPtr impl = streamTable.get(infoLabel);
- if (!impl)
+}
+
+FramePayloadPtr StreamSourceRTPImplPriv::decodeAudioPayload(const Ice::ByteSeq& toDecode, const AudioFormatPtr& audioFormat)
+{
+ if (audioFormat->sampleSize == 8)
{
- return;
+ return new ByteSeqPayload(toDecode);
+ }
+ else if (audioFormat->sampleSize == 16)
+ {
+ Ice::ShortSeq shortPayload((Ice::Short*) &toDecode.front(),
+ (Ice::Short*) &toDecode[toDecode.size()]);
+
+ std::transform(shortPayload.begin(), shortPayload.end(),
+ shortPayload.begin(), boost::asio::detail::socket_ops::network_to_host_short);
+
+ return new ShortSeqPayload(shortPayload);
+ }
+ return 0;
+}
+
+FramePayloadPtr StreamSourceRTPImplPriv::decodeVideoPayload(const Ice::ByteSeq& toDecode, const VideoFormatPtr&)
+{
+ //Assume for now video payloads use 8-bit samples...
+ return new ByteSeqPayload(toDecode);
+}
+
+void StreamSourceRTPImpl::destroy()
+{
+ PJMEDIATransportPtr transport;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed)
+ {
+ return;
+ }
+ transport = mImpl->mTransport;
+ mImpl->mDestroyed = true;
+ mImpl->mSourceStateItem = 0;
+ mImpl->mTelephonyEventSource = 0;
+ mImpl->mSessionAdapter.reset();
+ // Destroy the RTCP transmission timer if it exists
+ if (mImpl->mTimer)
+ {
+ mImpl->mTimer->destroy();
+ mImpl->mTimer = 0;
+ }
}
+ TransportMap::instance().removeTransport(MapRecord(transport, this));
+}
+
+/**
+ * Function which is called when RTP media is received.
+ */
+bool StreamSourceRTPImpl::receiveRTP(void *userdata, void *packet, size_t size)
+{
//////////////////////////////////////////////////////////////////////////////////////////////////
// WARNING!
// There is a lot of stuff happening from here down that is done with
@@ -536,30 +569,30 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
Ice::Byte* payload;
unsigned int payload_size;
- boost::shared_lock<boost::shared_mutex> lock(impl->mLock);
- if (impl->mDestroyed)
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed)
{
- return;
+ return false;
}
/* We have to cast 'size' to an 'int' here so the compiler won't warn about
* doing it implicitly.
*/
- pj_status_t status = pjmedia_rtp_decode_rtp(&impl->mIncomingSession, packet, (int) size, &header,
+ pj_status_t status = pjmedia_rtp_decode_rtp(&mImpl->mIncomingSession, packet, (int) size, &header,
(const void**)&payload, &payload_size);
if (status != PJ_SUCCESS)
{
lg(Error) << "We read an RTP packet of size " << size << " in but failed to decode it.";
- return;
+ return true;
}
// Update RTP stack information before writing to sinks, it's fine to do it
pjmedia_rtp_status rtpStatus;
- pjmedia_rtp_session_update2(&impl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
+ pjmedia_rtp_session_update2(&mImpl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
// Update RTCP information
- pjmedia_rtcp_rx_rtp2(impl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
+ pjmedia_rtcp_rx_rtp2(mImpl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
payload_size, ((rtpStatus.status.value && rtpStatus.status.flag.bad) || !payload_size)
? PJ_TRUE : PJ_FALSE);
@@ -567,29 +600,29 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
if (rtpStatus.status.value && rtpStatus.status.flag.badssrc)
{
std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
- impl->mSessionAdapter->getReceiverReportListeners();
+ mImpl->mSessionAdapter->getReceiverReportListeners();
for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
listener != listeners.end();
++listener)
{
- (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), impl->mSource,
- impl->mIncomingSession.peer_ssrc);
+ (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), mImpl->mSource,
+ mImpl->mIncomingSession.peer_ssrc);
}
}
- if (impl->mSourceStateItem->sinks.empty())
+ if (mImpl->mSourceStateItem->sinks.empty())
{
// No sinks present so frames can not go anywhere
- return;
+ return true;
}
- FormatPtr mediaformat = impl->mSessionAdapter->getFormat(header->pt);
+ FormatPtr mediaformat = mImpl->mSessionAdapter->getFormat(header->pt);
if (!mediaformat)
{
// If this is for a payload we don't know about just drop the frame
- return;
+ return true;
}
FrameSeq frames;
@@ -603,7 +636,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
AudioFramePtr frame = new AudioFrame();
Ice::ByteSeq bytePayload(payload, payload + payload_size);
- frame->payload = impl->decodeAudioPayload(bytePayload, audioformat);
+ frame->payload = mImpl->decodeAudioPayload(bytePayload, audioformat);
// Populate the common data
frame->mediaFormat = mediaformat;
@@ -618,7 +651,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
VideoFramePtr frame = new VideoFrame();
Ice::ByteSeq bytePayload(payload, payload + payload_size);
- frame->payload = impl->decodeVideoPayload(bytePayload, videoformat);
+ frame->payload = mImpl->decodeVideoPayload(bytePayload, videoformat);
frame->mediaFormat = mediaformat;
frame->timestamp = header->ts;
@@ -628,17 +661,17 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
}
else if ((rfc4733 = RFC4733Ptr::dynamicCast(mediaformat)))
{
- impl->mTelephonyEventSource->read(header, payload);
+ mImpl->mTelephonyEventSource->read(header, payload);
}
if (frames.empty())
{
// If the media format ended up being a type we don't understand don't bother writing it out
- return;
+ return true;
}
- for (StreamSinkSeq::iterator sink = impl->mSourceStateItem->sinks.begin();
- sink != impl->mSourceStateItem->sinks.end();
+ for (StreamSinkSeq::iterator sink = mImpl->mSourceStateItem->sinks.begin();
+ sink != mImpl->mSourceStateItem->sinks.end();
++sink)
{
try
@@ -655,244 +688,30 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
/**
* Function which is called when RTCP is received.
*/
-static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
+bool StreamSourceRTPImpl::receiveRTCP(void *userdata, void *packet, size_t size)
{
- /* Ensure that no errors occurred when reading this packet in */
- if (size < 0)
- {
- lg(Error) << "We attempted to read data from an RTCP session but failed.";
- return;
- }
-
- const char* infoLabel = static_cast<const char*>(userdata);
- if (!infoLabel)
- {
- lg(Error) << "Attempted to process an RTP packet with no associated session information.";
- return;
- }
- StreamSourceRTPImplPrivPtr impl = streamTable.get(infoLabel);
- if (!impl)
- {
- return;
- }
-
- boost::shared_lock<boost::shared_mutex> lock(impl->mLock);
- if (impl->mDestroyed)
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed)
{
- return;
+ return false;
}
- pjmedia_rtcp_rx_rtcp(impl->mSessionAdapter->getRtcpSession(), packet, size);
+ pjmedia_rtcp_rx_rtcp(mImpl->mSessionAdapter->getRtcpSession(), packet, size);
std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
- impl->mSessionAdapter->getReceiverReportListeners();
+ mImpl->mSessionAdapter->getReceiverReportListeners();
if (listeners.empty())
{
- return;
+ return true;
}
- AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = impl->mSessionAdapter->getReceiverReportStatistics();
+ AsteriskSCF::Media::RTCP::V1::StatisticsPtr statistics = mImpl->mSessionAdapter->getReceiverReportStatistics();
for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
listener != listeners.end();
++listener)
{
- (*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), impl->mSource, statistics);
- }
-}
-
-/**
- * API call which sets up our pjmedia transport and allows media to be sent and received.
- */
-void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
-{
- pj_sockaddr addr;
-
- /* This feels so dirty but convert from our std::string to a pj_str, since their API requires it. */
- pj_str_t tmpAddress;
- pj_strset(&tmpAddress, (char*)address.c_str(), address.size());
-
- /* Now for the next trick - convert into a pj_sockaddr so we can pass it to pjmedia_transport_attach */
- pj_status_t status = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &tmpAddress, &addr);
-
- if (status != PJ_SUCCESS)
- {
- throw InvalidAddress();
- }
-
- // Confirm that the address family of the address matches that of this RTP session
- pjmedia_transport_info transportInfo;
-
- pjmedia_transport_info_init(&transportInfo);
- pjmedia_transport_get_info(mImpl->mTransport->getTransport(), &transportInfo);
-
- if (transportInfo.sock_info.rtp_addr_name.addr.sa_family != addr.addr.sa_family)
- {
- throw InvalidAddress();
- }
-
- pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
-
- /* In case we were already attached go ahead and detach */
- mImpl->mTransport->attachLabel(mImpl->mSessionId);
-
- /* All ready... actually do it! */
- status = pjmedia_transport_attach(mImpl->mTransport->getTransport(),
- mImpl->mTransport->getLabelData().get(), &addr, &transportInfo.src_rtcp_name,
- pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
-
- if (status != PJ_SUCCESS)
- {
- throw InvalidAddress();
- }
-}
-
-/**
- * API call which sets up our pjmedia transport and allows media to be sent and received.
- */
-void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
-{
- pj_sockaddr addr;
-
- /* This feels so dirty but convert from our std::string to a pj_str, since their API requires it. */
- pj_str_t tmpAddress;
- pj_strset(&tmpAddress, (char*)address.c_str(), address.size());
-
- /* Now for the next trick - convert into a pj_sockaddr so we can pass it to pjmedia_transport_attach */
- pj_status_t status = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &tmpAddress, &addr);
-
- if (status != PJ_SUCCESS)
- {
- throw InvalidAddress();
- }
-
- // Confirm that the address family of the address matches that of this RTP session
- pjmedia_transport_info transportInfo;
-
- pjmedia_transport_info_init(&transportInfo);
- pjmedia_transport_get_info(mImpl->mTransport->getTransport(), &transportInfo);
-
- if (transportInfo.sock_info.rtcp_addr_name.addr.sa_family != addr.addr.sa_family)
- {
- throw InvalidAddress();
- }
-
- pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
-
- /* In case we were already attached go ahead and detach */
- mImpl->mTransport->attachLabel(mImpl->mSessionId);
-
- /* All ready... actually do it! */
- status = pjmedia_transport_attach(mImpl->mTransport->getTransport(), mImpl->mTransport->getLabelData().get(), &transportInfo.src_rtp_name, &addr,
- pj_sockaddr_get_len(&addr), &receiveRTP, &receiveRTCP);
-
- if (status != PJ_SUCCESS)
- {
- throw InvalidAddress();
- }
-
- // If RTCP is not already being sent start sending it
- if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
- {
- RtcpTransmissionPtr transmission;
-
- if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport, mImpl->mThreadDescriptor)))
- {
- mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
- }
- }
-}
-
-/**
- * API call which returns a pointer to the source state item.
- */
-RTPStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
-{
- boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
- return mImpl->mSourceStateItem;
-}
-
-RTPTelephonyEventSourceStateItemPtr StreamSourceRTPImpl::getTelephonyEventSourceStateItem()
-{
- boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
- if (mImpl->mDestroyed && mImpl->mTelephonyEventSource)
- {
- return mImpl->mTelephonyEventSource->getStateItem();
- }
- return 0;
-}
-
-void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
-{
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
- if (!mImpl->mDestroyed)
- {
- mImpl->mSourceStateItem->sinks = sinks;
- }
-}
-
-FramePayloadPtr StreamSourceRTPImplPriv::decodeAudioPayload(const Ice::ByteSeq& toDecode, const AudioFormatPtr& audioFormat)
-{
- if (audioFormat->sampleSize == 8)
- {
- return new ByteSeqPayload(toDecode);
- }
- else if (audioFormat->sampleSize == 16)
- {
- Ice::ShortSeq shortPayload((Ice::Short*) &toDecode.front(),
- (Ice::Short*) &toDecode[toDecode.size()]);
-
- std::transform(shortPayload.begin(), shortPayload.end(),
- shortPayload.begin(), boost::asio::detail::socket_ops::network_to_host_short);
-
- return new ShortSeqPayload(shortPayload);
- }
- return 0;
-}
-
-FramePayloadPtr StreamSourceRTPImplPriv::decodeVideoPayload(const Ice::ByteSeq& toDecode, const VideoFormatPtr&)
-{
- //Assume for now video payloads use 8-bit samples...
- return new ByteSeqPayload(toDecode);
-}
-
-void StreamSourceRTPImpl::destroy()
-{
- string lbl;
- {
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
- if (mImpl->mDestroyed)
- {
- return;
- }
- lbl = mImpl->mSessionId;
- mImpl->mDestroyed = true;
- mImpl->mSourceStateItem = 0;
- mImpl->mTelephonyEventSource = 0;
- mImpl->mSessionAdapter.reset();
- // Destroy the RTCP transmission timer if it exists
- if (mImpl->mTimer)
- {
- mImpl->mTimer->destroy();
- mImpl->mTimer = 0;
- }
- }
- streamTable.remove(lbl);
-}
-
-boost::shared_ptr<StreamSourceRTPImplPriv> StreamSourceRTPImpl::getImpl()
-{
- return mImpl;
-}
-
-void StreamSourceRTPImpl::activate()
-{
- boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
- if (!mLabelBuffer)
- {
- mLabelBuffer = new char[mImpl->mSessionId.size() + 1];
- strcpy(mLabelBuffer, mImpl->mSessionId.c_str());
- streamTable.add(mImpl->mSessionId, mImpl);
+ (*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), mImpl->mSource, statistics);
}
}
diff --git a/src/RTPSource.h b/src/RTPSource.h
index 4f6338b..e84029b 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -59,12 +59,10 @@ public:
*/
void destroy();
- void activate();
-
- boost::shared_ptr<StreamSourceRTPImplPriv> getImpl();
+ bool receiveRTP(void* userData, void* packet, size_t size);
+ bool receiveRTCP(void* userData, void* packet, size_t size);
private:
- char* mLabelBuffer;
/**
* Private implementation data for StreamSourceRTPImpl.
diff --git a/src/TransportMap.cpp b/src/TransportMap.cpp
new file mode 100755
index 0000000..923e344
--- /dev/null
+++ b/src/TransportMap.cpp
@@ -0,0 +1,258 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "TransportMap.h"
+
+#include <AsteriskSCF/Logger.h>
+#include <AsteriskSCF/Helpers/Network.h>
+
+#include <pjlib.h>
+#include <pjmedia.h>
+
+using namespace AsteriskSCF::PJMEDIARTP;
+using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::Helpers;
+using namespace std;
+
+namespace
+{
+Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
+
+struct TransportNotFound
+{
+};
+
+
+void initAddr(pj_sockaddr& a, const string& addr, int port)
+{
+ pj_str_t t;
+ pj_strset(&t, (char*)addr.c_str(), addr.size());
+ pj_status_t result = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &t, &a);
+ if (result != PJ_SUCCESS)
+ {
+ throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
+ }
+ pj_sockaddr_set_port(&a, static_cast<pj_uint16_t>(port));
+}
+
+void initTransportInfo(pjmedia_transport_info& info, pjmedia_transport* transport)
+{
+ pjmedia_transport_info_init(&info);
+ pjmedia_transport_get_info(transport, &info);
+}
+
+bool isCompatible(const pj_sockaddr& addr, const pjmedia_transport_info& info)
+{
+ return info.sock_info.rtp_addr_name.addr.sa_family == addr.addr.sa_family;
+}
+
+}
+
+void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int port)
+{
+ assert(r.transport);
+ assert(r.source);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ AddressPtr address = r.transport->localAddress();
+ assert(address);
+ unsigned tblIndex = address->port() / 1000;
+ unsigned arrayIndex = address->port() % 1000;
+
+ pj_sockaddr addr;
+ initAddr(addr, ip, port);
+ pjmedia_transport_info info;
+ pjmedia_transport* transport = r.transport->getTransport();
+ assert(transport);
+ initTransportInfo(info, transport);
+ if (!isCompatible(addr, info))
+ {
+ throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
+ }
+
+ //
+ // Lazy initialization of port subtables.
+ //
+ if (mTable[tblIndex].size() < arrayIndex)
+ {
+ mTable[tblIndex].resize(arrayIndex);
+ }
+
+ MapRecord oldRecord = mTable[tblIndex][arrayIndex];
+ if (oldRecord.transport)
+ {
+ if (oldRecord.transport == r.transport && oldRecord.source == r.source && port == address->port())
+ {
+ //
+ // We do not have to do anything here because doing what
+ // comes next will not change anything. This can happen if
+ // setRtcpRemoteDetails is called in addition to
+ // setRemoteDetails.
+ //
+ return;
+ }
+ lg(Debug) << "Overwriting existing transport entry and detaching transport.";
+ pjmedia_transport* t = oldRecord.transport->getTransport();
+ pjmedia_transport_detach(t, reinterpret_cast<void*>(port));
+ }
+
+ mTable[tblIndex][arrayIndex] = r;
+ pj_status_t result = pjmedia_transport_attach(r.transport->getTransport(),
+ reinterpret_cast<void*>(port), &addr, &info.src_rtcp_name, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
+ &TransportMap::receiveRTCP);
+ if (result != PJ_SUCCESS)
+ {
+ //
+ // Do we revert the system or simply set to a sane initial
+ // state? We cannot really re-attach because we don't have all
+ // of the info at this point, so go with the sane initial
+ // state for now.
+ //
+ mTable[tblIndex][arrayIndex] = MapRecord();
+ throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
+ }
+
+ incEntries();
+}
+
+void TransportMap::removeTransport(const MapRecord& r)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ AddressPtr address = r.transport->localAddress();
+ assert(address);
+ unsigned tblIndex = address->port() / 1000;
+ unsigned arrayIndex = address->port() % 1000;
+ if (mTable[tblIndex].size() < arrayIndex || !mTable[tblIndex][arrayIndex].transport)
+ {
+ lg(Debug) << "Attempting to remove a non-existant transport";
+ return;
+ }
+ MapRecord toRemove = mTable[tblIndex][arrayIndex];
+ if (toRemove.transport)
+ {
+ pjmedia_transport* t = toRemove.transport->getTransport();
+ if (t)
+ {
+ pjmedia_transport_detach(t, reinterpret_cast<void*>(address->port()));
+ }
+ }
+
+ mTable[tblIndex][arrayIndex] = MapRecord();
+ decEntries();
+}
+
+TransportMap& TransportMap::instance()
+{
+ return mInstance;
+}
+
+//
+// Instantiation of global single instance.
+//
+TransportMap TransportMap::mInstance;
+
+TransportMap::TransportMap()
+#ifndef _NDEBUG
+ : mCounter(0)
+#endif
+{
+}
+
+void TransportMap::incEntries()
+{
+#ifndef _NDEBUG
+ ++mCounter;
+ lg(Debug) << "Transport added: " << mCounter << " active transports";
+#endif
+}
+
+void TransportMap::decEntries()
+{
+#ifndef _NDEBUG
+ --mCounter;
+ lg(Debug) << "Transport removed: " << mCounter << " active transports";
+#endif
+}
+
+MapRecord TransportMap::get(const Ice::Int port)
+{
+ boost::shared_lock<boost::shared_mutex> lock(mLock);
+ unsigned tblIndex = port / 1000;
+ unsigned arrayIndex = port % 1000;
+
+ //
+ // Bounds check
+ //
+ assert(tblIndex >= 0 && tblIndex <= mTable.size());
+ if (mTable[tblIndex].empty() || mTable[tblIndex].size() < arrayIndex)
+ {
+ throw TransportNotFound();
+ }
+
+ if (!mTable[tblIndex][arrayIndex].transport)
+ {
+ assert(!mTable[tblIndex][arrayIndex].source);
+ throw TransportNotFound();
+ }
+ return mTable[tblIndex][arrayIndex];
+}
+
+void TransportMap::receiveRTP(void* userdata, void* packet, pj_ssize_t size)
+{
+ /* Ensure that no errors occurred when reading this packet in */
+ if (size < 0)
+ {
+ lg(Error) << "We attempted to read data from an RTP session but failed.";
+ return;
+ }
+
+ int portKey = reinterpret_cast<int>(userdata);
+ try
+ {
+ MapRecord transportRec = TransportMap::instance().get(portKey);
+ if (!transportRec.source->receiveRTP(userdata, packet, size))
+ {
+ TransportMap::instance().removeTransport(transportRec);
+ }
+ }
+ catch (const TransportNotFound&)
+ {
+ lg(Error) << "Packet received for unknown transport instance.";
+ }
+}
+
+void TransportMap::receiveRTCP(void* userdata, void* packet, pj_ssize_t size)
+{
+ /* Ensure that no errors occurred when reading this packet in */
+ if (size < 0)
+ {
+ lg(Error) << "We attempted to read data from an RTP session but failed.";
+ return;
+ }
+
+ int portKey = reinterpret_cast<int>(userdata);
+ try
+ {
+ MapRecord transportRec = TransportMap::instance().get(portKey);
+ if (!transportRec.source->receiveRTCP(userdata, packet, size))
+ {
+ TransportMap::instance().removeTransport(transportRec);
+ }
+ }
+ catch (const TransportNotFound&)
+ {
+ lg(Error) << "Packet received for unknown transport instance.";
+ }
+}
diff --git a/src/TransportMap.h b/src/TransportMap.h
new file mode 100755
index 0000000..dd5b9b3
--- /dev/null
+++ b/src/TransportMap.h
@@ -0,0 +1,91 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include "PJMEDIATransport.h"
+#include "RTPSource.h"
+#include <vector>
+#include <array>
+#include <boost/thread.hpp>
+
+#include <pjlib.h>
+
+namespace AsteriskSCF
+{
+namespace PJMEDIARTP
+{
+
+struct MapRecord
+{
+ PJMEDIATransportPtr transport;
+ StreamSourceRTPImplPtr source;
+
+ MapRecord(const PJMEDIATransportPtr& t, const StreamSourceRTPImplPtr& s) :
+ transport(t), source(s)
+ {
+ }
+
+ MapRecord()
+ {
+ }
+};
+
+class TransportMap
+{
+public:
+
+ void addTransport(const MapRecord& transport, const std::string& address, Ice::Int port);
+ void removeTransport(const MapRecord& transport);
+
+ static TransportMap& instance();
+
+private:
+ //
+ // While the TransportTable results in a fixed allocation, it is only
+ // of 65 TransportArrays. The TransportArray's are grown on-demand.
+ // They will *not* shrink once they've been grown, but it is arguable
+ // that an RTP component that loads up will likely load up again!
+ //
+ typedef std::vector<MapRecord> TransportArray;
+ typedef std::array<TransportArray, 65> TransportTable;
+
+ boost::shared_mutex mLock;
+ TransportTable mTable;
+#ifndef _NDEBUG
+ unsigned long mCounter;
+#endif
+
+ static TransportMap mInstance;
+
+ TransportMap();
+
+ //
+ // Mostly for debugging... the bodies of these methods are
+ // conditionally compiled out if _NDEBUG is defined. I'd imagine a
+ // frisky optimizer would take the calls out entirely.
+ //
+ void incEntries();
+ void decEntries();
+
+ MapRecord get(const Ice::Int port);
+
+ static void receiveRTP(void* userData, void* packet, pj_ssize_t size);
+ static void receiveRTCP(void* userData, void* packet, pj_ssize_t size);
+};
+
+} /* End of namespace PJMEDIARTP */
+} /* End of namespace AsteriskSCF */
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list