[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Aug 7 11:40:52 CDT 2012
branch "master" has been updated
via d12e4a2e848fe93d2dbe6275a5fb20a01c4740d0 (commit)
via d35deff9771baf9a17cee0cc10ffb1eb286c5d5e (commit)
via 78dd865f792d26aa6e750f6d19f6153211c3282b (commit)
via 9f20808efe50679ec12f187c003324519bbe5b54 (commit)
via a80ff0e6e50f11dfb1c0e67a318f9f239248903b (commit)
via 44e151d1c320b5e37990b0b4fdbe3650dcbd4114 (commit)
via 15a6cae74e650fdeec8945b283f5205410b7481f (commit)
via bf9ec7cdc51731b1f13db3421c7ae7f6a20831aa (commit)
via c89be9b7c37e4f61b365d8e620b6dadf297462f8 (commit)
via 696c728ff5cd8c3d3659e33ed492af68b4ae07ad (commit)
via d5146877151aaf690908b8710a030acf18a557d9 (commit)
via 33d85fe523555ae4ab637ca62f09da3b6b8a91c7 (commit)
via f0e6255ca9fe653b034055d82df3cd48319ad3dc (commit)
via 275d7d2461aee6c129f4f4d5e5bfa2e86b93166d (commit)
via d866db0ec2b66194d3f646b7c330959b21b8b3b4 (commit)
from 9bc18b0c49e8f0144fa1256aa4aba89ac7b932a4 (commit)
Summary of changes:
src/CMakeLists.txt | 2 +
src/ICETransport.cpp | 3 +-
src/ICETransport.h | 1 -
src/PJMEDIAEndpoint.cpp | 17 ++-
src/PJMEDIAEndpoint.h | 6 +-
src/PJMEDIAEnvironment.cpp | 11 +
src/PJMEDIAEnvironment.h | 16 ++
src/PJMEDIATransport.cpp | 5 +-
src/PJMEDIATransport.h | 5 +-
src/RTPSession.cpp | 31 +++-
src/RTPSink.cpp | 67 ++++++-
src/RTPSink.h | 6 +
src/RTPSource.cpp | 445 ++++++++++++++++++++++++--------------------
src/RTPSource.h | 22 ++-
src/SRTPTransport.cpp | 6 +-
src/SRTPTransport.h | 2 +-
src/TransportMap.cpp | 273 +++++++++++++++++++++++++++
src/TransportMap.h | 88 +++++++++
src/UDPTransport.cpp | 6 +-
src/UDPTransport.h | 2 +-
test/TestRTPpjmedia.cpp | 5 +
21 files changed, 779 insertions(+), 240 deletions(-)
create mode 100755 src/TransportMap.cpp
create mode 100755 src/TransportMap.h
- Log -----------------------------------------------------------------
commit d12e4a2e848fe93d2dbe6275a5fb20a01c4740d0
Author: Brent Eagles <beagles at digium.com>
Date: Tue Aug 7 14:01:53 2012 -0230
Fix a bogus code reordering that would cause pjsip code to be called from a
timer thread before it was registered with pjlib.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 2502806..12b8003 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -110,13 +110,6 @@ public:
void runTimerTask()
{
- if (isShutdown())
- {
- mSessionAdapter.reset();
- mTransport.reset();
- return;
- }
-
if (pj_thread_is_registered() == PJ_FALSE)
{
pj_thread_t *thread;
@@ -124,6 +117,13 @@ public:
pj_status_t status = pj_thread_register("ICE Thread", mThreadDescriptor->mDesc, &thread);
assert(status == PJ_SUCCESS);
}
+
+ if (isShutdown())
+ {
+ mSessionAdapter.reset();
+ mTransport.reset();
+ return;
+ }
void *packet;
int packet_size;
commit d35deff9771baf9a17cee0cc10ffb1eb286c5d5e
Author: Brent Eagles <beagles at digium.com>
Date: Wed Jul 25 17:48:29 2012 -0230
The component now uses a single timer object for all rtp sources. This
shouldn't be a huge issue... the timer worker thread will be very busy on rtp
components with lots of sessions. An improvement would be to have a pool of
timer instances and round-robin through them.
diff --git a/src/PJMEDIAEnvironment.cpp b/src/PJMEDIAEnvironment.cpp
index 89f7059..5fe25dc 100644
--- a/src/PJMEDIAEnvironment.cpp
+++ b/src/PJMEDIAEnvironment.cpp
@@ -48,8 +48,9 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
const RTPConfigurationPtr& configObject) :
mPJLIBConfig(libCfg),
mConfiguration(configObject),
- mCachingPool(new pj_caching_pool),
- mIOQueue(0)
+ mIOQueue(0),
+ mTimer(new IceUtil::Timer),
+ mCachingPool(new pj_caching_pool)
{
//
// I find this practice a little sketchy since the pointers that might be retrieve through the accessors *must*
diff --git a/src/PJMEDIAEnvironment.h b/src/PJMEDIAEnvironment.h
index 418d48e..774b17f 100644
--- a/src/PJMEDIAEnvironment.h
+++ b/src/PJMEDIAEnvironment.h
@@ -19,6 +19,7 @@
#include "Configuration.h"
#include <Ice/PropertiesF.h>
+#include <IceUtil/Timer.h>
#include <string>
#include <boost/shared_ptr.hpp>
@@ -117,6 +118,11 @@ public:
return mIOQueue;
}
+ IceUtil::TimerPtr timer()
+ {
+ return mTimer;
+ }
+
/**
* Create an instance of the object based on the Ice properties.
*/
@@ -130,6 +136,7 @@ private:
pj_pool_factory* mPoolFactory;
pj_pool_t* mMemoryPool;
pj_ioqueue_t* mIOQueue;
+ IceUtil::TimerPtr mTimer;
boost::shared_ptr<pj_caching_pool> mCachingPool;
PJMEDIAEnvironment(const PJLIBConfigurationPtr& libConfig,
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index fca1d53..17bbd93 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1153,7 +1153,8 @@ RTPSessionPrx RTPSessionImpl::activate(
mTransport,
mId,
StreamSourceRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sourceId)),
- StreamSinkRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sinkId)));
+ StreamSinkRTPPrx::uncheckedCast(mAdapter->createDirectProxy(sinkId)),
+ mEnvironment->timer());
mStreamSink = new StreamSinkRTPImpl(
mSessionAdapter,
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index fbae703..2502806 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -201,7 +201,8 @@ public:
const PJMEDIATransportPtr& transport,
const string& parentSessionId,
const StreamSourceRTPPrx& source,
- const StreamSinkRTPPrx& sink
+ const StreamSinkRTPPrx& sink,
+ const IceUtil::TimerPtr& timer
);
/**
@@ -277,10 +278,12 @@ StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& sessio
const PJMEDIATransportPtr& transport,
const string& sessionId,
const StreamSourceRTPPrx& source,
- const StreamSinkRTPPrx& sink) :
+ const StreamSinkRTPPrx& sink,
+ const IceUtil::TimerPtr& timer) :
mSessionAdapter(session), mTransport(transport),
mSourceStateItem(new RTPStreamSourceStateItem),
mSessionId(sessionId),
+ mTimer(timer),
mSource(source),
mSink(sink),
mThreadDescriptor(new ThreadDescWrapper),
@@ -296,10 +299,6 @@ StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& sessio
*/
StreamSourceRTPImplPriv::~StreamSourceRTPImplPriv()
{
- if (mTimer)
- {
- mTimer->destroy();
- }
}
/**
@@ -308,8 +307,9 @@ StreamSourceRTPImplPriv::~StreamSourceRTPImplPriv()
StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
const PJMEDIATransportPtr& transport, const string& sessionId,
const StreamSourceRTPPrx& source,
- const StreamSinkRTPPrx& sink) :
- mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
+ const StreamSinkRTPPrx& sink,
+ const IceUtil::TimerPtr& timer) :
+ mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink, timer))
{
}
@@ -483,7 +483,7 @@ void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::
{
TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, true);
boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
- if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
+ if (mImpl->mTimer)
{
if (mImpl->mTransmission)
{
@@ -564,11 +564,6 @@ void StreamSourceRTPImpl::destroy()
mImpl->mTelephonyEventSource = 0;
mImpl->mSessionAdapter.reset();
// Destroy the RTCP transmission timer if it exists
- if (mImpl->mTimer)
- {
- mImpl->mTimer->destroy();
- mImpl->mTimer = 0;
- }
if (mImpl->mTransmission)
{
mImpl->mTransmission->shutdown();
diff --git a/src/RTPSource.h b/src/RTPSource.h
index e84029b..8c4ef63 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -12,6 +12,7 @@
#include "SessionAdapter.h"
#include <boost/shared_ptr.hpp>
#include <IceUtil/Handle.h>
+#include <IceUtil/Timer.h>
#include "RTPTelephonyEventSource.h"
/**
@@ -29,7 +30,8 @@ public:
const AsteriskSCF::PJMEDIARTP::PJMEDIATransportPtr& transport,
const std::string& parentSessionId,
const AsteriskSCF::Media::RTP::V1::StreamSourceRTPPrx& source,
- const AsteriskSCF::Media::RTP::V1::StreamSinkRTPPrx& sink);
+ const AsteriskSCF::Media::RTP::V1::StreamSinkRTPPrx& sink,
+ const IceUtil::TimerPtr& mTimer);
void addSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
void removeSink(const AsteriskSCF::System::V1::OperationContextPtr&, const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
commit 78dd865f792d26aa6e750f6d19f6153211c3282b
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 24 12:16:37 2012 -0230
Experimental changes aimed at constraining resource consumption when there are
many concurrent sessions.
- Instantiate only one ioqueue per component
- Instantiate a single media endpoint and share among the sessions. The
endpoint is the source of thread allocations so constraining how many of these
are created is essential.
diff --git a/src/PJMEDIAEndpoint.cpp b/src/PJMEDIAEndpoint.cpp
index 97092ec..1ba53fc 100644
--- a/src/PJMEDIAEndpoint.cpp
+++ b/src/PJMEDIAEndpoint.cpp
@@ -23,20 +23,31 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
+const int MaxThreads = 16;
+
+boost::mutex PJMEDIAEndpoint::mEndpointMutex;
+PJMEDIAEndpointPtr PJMEDIAEndpoint::mEndpointInstance;
+
PJMEDIAEndpoint::~PJMEDIAEndpoint()
{
pjmedia_endpt_destroy(mEndpoint);
}
-PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::create(const PJMEDIAEnvironmentPtr& env)
+PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::get(const PJMEDIAEnvironmentPtr& env)
{
+ boost::lock_guard<boost::mutex> lock(mEndpointMutex);
+ if (mEndpointInstance)
+ {
+ return mEndpointInstance;
+ }
pjmedia_endpt* t;
- pj_status_t result = pjmedia_endpt_create(env->poolFactory(), 0, 1, &t);
+ pj_status_t result = pjmedia_endpt_create(env->poolFactory(), env->ioQueue(), MaxThreads, &t);
if (fail(result))
{
throw InternalInitializationException("Unable to create media endpoint!");
}
- return PJMEDIAEndpointPtr(new PJMEDIAEndpoint(t));
+ mEndpointInstance.reset(new PJMEDIAEndpoint(t));
+ return mEndpointInstance;
}
PJMEDIAEndpoint::PJMEDIAEndpoint(pjmedia_endpt* endpt) :
diff --git a/src/PJMEDIAEndpoint.h b/src/PJMEDIAEndpoint.h
index 93c614e..674a82d 100644
--- a/src/PJMEDIAEndpoint.h
+++ b/src/PJMEDIAEndpoint.h
@@ -18,6 +18,7 @@
#include "PJMEDIAEnvironment.h"
#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
//
// forward declarations.
@@ -43,13 +44,16 @@ public:
return mEndpoint;
}
- static PJMEDIAEndpointPtr create(const PJMEDIAEnvironmentPtr& environ);
+ static PJMEDIAEndpointPtr get(const PJMEDIAEnvironmentPtr& environ);
private:
pjmedia_endpt* mEndpoint;
PJMEDIAEndpoint(pjmedia_endpt* endpoint);
+ static boost::mutex mEndpointMutex;
+ static PJMEDIAEndpointPtr mEndpointInstance;
+
//
// Hidden and unimplemented.
//
diff --git a/src/PJMEDIAEnvironment.cpp b/src/PJMEDIAEnvironment.cpp
index 2983073..89f7059 100644
--- a/src/PJMEDIAEnvironment.cpp
+++ b/src/PJMEDIAEnvironment.cpp
@@ -27,6 +27,14 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
+PJMEDIAEnvironment::~PJMEDIAEnvironment()
+{
+ if (mIOQueue)
+ {
+ pj_ioqueue_destroy(mIOQueue);
+ }
+}
+
//
// The main work of creating the various objects is done by the factory, not the PJMEDIAEnvironment constructor.
//
@@ -40,7 +48,8 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
const RTPConfigurationPtr& configObject) :
mPJLIBConfig(libCfg),
mConfiguration(configObject),
- mCachingPool(new pj_caching_pool)
+ mCachingPool(new pj_caching_pool),
+ mIOQueue(0)
{
//
// I find this practice a little sketchy since the pointers that might be retrieve through the accessors *must*
@@ -53,4 +62,5 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
// TODO: should these values come from configuration.
//
mMemoryPool = pj_pool_create(mPoolFactory, "media_rtp_pjmedia", 1024, 1024, 0);
+ pj_status_t ioqueueStatus = pj_ioqueue_create(mMemoryPool, PJ_IOQUEUE_MAX_HANDLES, &mIOQueue);
}
diff --git a/src/PJMEDIAEnvironment.h b/src/PJMEDIAEnvironment.h
index 67e604a..418d48e 100644
--- a/src/PJMEDIAEnvironment.h
+++ b/src/PJMEDIAEnvironment.h
@@ -29,6 +29,7 @@ struct pj_pool_factory;
struct pjmedia_endpt;
struct pj_pool_t;
struct pj_caching_pool;
+struct pj_ioqueue_t;
namespace AsteriskSCF
{
@@ -58,6 +59,7 @@ typedef boost::shared_ptr<PJMEDIAEnvironment> PJMEDIAEnvironmentPtr;
class PJMEDIAEnvironment
{
public:
+ virtual ~PJMEDIAEnvironment();
/**
* Get generic configuration object.
@@ -110,6 +112,11 @@ public:
return mMemoryPool;
}
+ pj_ioqueue_t* ioQueue() const
+ {
+ return mIOQueue;
+ }
+
/**
* Create an instance of the object based on the Ice properties.
*/
@@ -122,10 +129,12 @@ private:
pj_pool_factory* mPoolFactory;
pj_pool_t* mMemoryPool;
+ pj_ioqueue_t* mIOQueue;
boost::shared_ptr<pj_caching_pool> mCachingPool;
PJMEDIAEnvironment(const PJLIBConfigurationPtr& libConfig,
const RTPConfigurationPtr& configObject);
+
};
} /* End of namespace PJMEDIARTP */
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index ce5ba9b..fca1d53 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -515,7 +515,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const ConfigurationServiceImplPtr& configurationService) :
mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::create(env)),
+ mEndpoint(PJMEDIAEndpoint::get(env)),
mId(id),
mAdapter(adapter),
mFormats(params->formats),
@@ -574,7 +574,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::create(env)),
+ mEndpoint(PJMEDIAEndpoint::get(env)),
mId(sessionIdentity),
mAdapter(adapter),
mFormats(formats),
commit 9f20808efe50679ec12f187c003324519bbe5b54
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 24 12:27:59 2012 -0230
Revert "Experimental changes aimed at constraining resource consumption when..."
This reverts commit a80ff0e6e50f11dfb1c0e67a318f9f239248903b.
diff --git a/src/PJMEDIAEndpoint.cpp b/src/PJMEDIAEndpoint.cpp
index 1ba53fc..97092ec 100644
--- a/src/PJMEDIAEndpoint.cpp
+++ b/src/PJMEDIAEndpoint.cpp
@@ -23,31 +23,20 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
-const int MaxThreads = 16;
-
-boost::mutex PJMEDIAEndpoint::mEndpointMutex;
-PJMEDIAEndpointPtr PJMEDIAEndpoint::mEndpointInstance;
-
PJMEDIAEndpoint::~PJMEDIAEndpoint()
{
pjmedia_endpt_destroy(mEndpoint);
}
-PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::get(const PJMEDIAEnvironmentPtr& env)
+PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::create(const PJMEDIAEnvironmentPtr& env)
{
- boost::lock_guard<boost::mutex> lock(mEndpointMutex);
- if (mEndpointInstance)
- {
- return mEndpointInstance;
- }
pjmedia_endpt* t;
- pj_status_t result = pjmedia_endpt_create(env->poolFactory(), env->ioQueue(), MaxThreads, &t);
+ pj_status_t result = pjmedia_endpt_create(env->poolFactory(), 0, 1, &t);
if (fail(result))
{
throw InternalInitializationException("Unable to create media endpoint!");
}
- mEndpointInstance.reset(new PJMEDIAEndpoint(t));
- return mEndpointInstance;
+ return PJMEDIAEndpointPtr(new PJMEDIAEndpoint(t));
}
PJMEDIAEndpoint::PJMEDIAEndpoint(pjmedia_endpt* endpt) :
diff --git a/src/PJMEDIAEndpoint.h b/src/PJMEDIAEndpoint.h
index 674a82d..93c614e 100644
--- a/src/PJMEDIAEndpoint.h
+++ b/src/PJMEDIAEndpoint.h
@@ -18,7 +18,6 @@
#include "PJMEDIAEnvironment.h"
#include <boost/shared_ptr.hpp>
-#include <boost/thread/mutex.hpp>
//
// forward declarations.
@@ -44,16 +43,13 @@ public:
return mEndpoint;
}
- static PJMEDIAEndpointPtr get(const PJMEDIAEnvironmentPtr& environ);
+ static PJMEDIAEndpointPtr create(const PJMEDIAEnvironmentPtr& environ);
private:
pjmedia_endpt* mEndpoint;
PJMEDIAEndpoint(pjmedia_endpt* endpoint);
- static boost::mutex mEndpointMutex;
- static PJMEDIAEndpointPtr mEndpointInstance;
-
//
// Hidden and unimplemented.
//
diff --git a/src/PJMEDIAEnvironment.cpp b/src/PJMEDIAEnvironment.cpp
index 89f7059..2983073 100644
--- a/src/PJMEDIAEnvironment.cpp
+++ b/src/PJMEDIAEnvironment.cpp
@@ -27,14 +27,6 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
-PJMEDIAEnvironment::~PJMEDIAEnvironment()
-{
- if (mIOQueue)
- {
- pj_ioqueue_destroy(mIOQueue);
- }
-}
-
//
// The main work of creating the various objects is done by the factory, not the PJMEDIAEnvironment constructor.
//
@@ -48,8 +40,7 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
const RTPConfigurationPtr& configObject) :
mPJLIBConfig(libCfg),
mConfiguration(configObject),
- mCachingPool(new pj_caching_pool),
- mIOQueue(0)
+ mCachingPool(new pj_caching_pool)
{
//
// I find this practice a little sketchy since the pointers that might be retrieve through the accessors *must*
@@ -62,5 +53,4 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
// TODO: should these values come from configuration.
//
mMemoryPool = pj_pool_create(mPoolFactory, "media_rtp_pjmedia", 1024, 1024, 0);
- pj_status_t ioqueueStatus = pj_ioqueue_create(mMemoryPool, PJ_IOQUEUE_MAX_HANDLES, &mIOQueue);
}
diff --git a/src/PJMEDIAEnvironment.h b/src/PJMEDIAEnvironment.h
index 418d48e..67e604a 100644
--- a/src/PJMEDIAEnvironment.h
+++ b/src/PJMEDIAEnvironment.h
@@ -29,7 +29,6 @@ struct pj_pool_factory;
struct pjmedia_endpt;
struct pj_pool_t;
struct pj_caching_pool;
-struct pj_ioqueue_t;
namespace AsteriskSCF
{
@@ -59,7 +58,6 @@ typedef boost::shared_ptr<PJMEDIAEnvironment> PJMEDIAEnvironmentPtr;
class PJMEDIAEnvironment
{
public:
- virtual ~PJMEDIAEnvironment();
/**
* Get generic configuration object.
@@ -112,11 +110,6 @@ public:
return mMemoryPool;
}
- pj_ioqueue_t* ioQueue() const
- {
- return mIOQueue;
- }
-
/**
* Create an instance of the object based on the Ice properties.
*/
@@ -129,12 +122,10 @@ private:
pj_pool_factory* mPoolFactory;
pj_pool_t* mMemoryPool;
- pj_ioqueue_t* mIOQueue;
boost::shared_ptr<pj_caching_pool> mCachingPool;
PJMEDIAEnvironment(const PJLIBConfigurationPtr& libConfig,
const RTPConfigurationPtr& configObject);
-
};
} /* End of namespace PJMEDIARTP */
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index fca1d53..ce5ba9b 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -515,7 +515,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const ConfigurationServiceImplPtr& configurationService) :
mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::get(env)),
+ mEndpoint(PJMEDIAEndpoint::create(env)),
mId(id),
mAdapter(adapter),
mFormats(params->formats),
@@ -574,7 +574,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::get(env)),
+ mEndpoint(PJMEDIAEndpoint::create(env)),
mId(sessionIdentity),
mAdapter(adapter),
mFormats(formats),
commit a80ff0e6e50f11dfb1c0e67a318f9f239248903b
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 24 12:16:37 2012 -0230
Experimental changes aimed at constraining resource consumption when there are
many concurrent sessions.
- Instantiate only one ioqueue per component
- Instantiate a single media endpoint and share among the sessions. The
endpoint is the source of thread allocations so constraining how many of these
are created is essential.
diff --git a/src/PJMEDIAEndpoint.cpp b/src/PJMEDIAEndpoint.cpp
index 97092ec..1ba53fc 100644
--- a/src/PJMEDIAEndpoint.cpp
+++ b/src/PJMEDIAEndpoint.cpp
@@ -23,20 +23,31 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
+const int MaxThreads = 16;
+
+boost::mutex PJMEDIAEndpoint::mEndpointMutex;
+PJMEDIAEndpointPtr PJMEDIAEndpoint::mEndpointInstance;
+
PJMEDIAEndpoint::~PJMEDIAEndpoint()
{
pjmedia_endpt_destroy(mEndpoint);
}
-PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::create(const PJMEDIAEnvironmentPtr& env)
+PJMEDIAEndpointPtr AsteriskSCF::PJMEDIARTP::PJMEDIAEndpoint::get(const PJMEDIAEnvironmentPtr& env)
{
+ boost::lock_guard<boost::mutex> lock(mEndpointMutex);
+ if (mEndpointInstance)
+ {
+ return mEndpointInstance;
+ }
pjmedia_endpt* t;
- pj_status_t result = pjmedia_endpt_create(env->poolFactory(), 0, 1, &t);
+ pj_status_t result = pjmedia_endpt_create(env->poolFactory(), env->ioQueue(), MaxThreads, &t);
if (fail(result))
{
throw InternalInitializationException("Unable to create media endpoint!");
}
- return PJMEDIAEndpointPtr(new PJMEDIAEndpoint(t));
+ mEndpointInstance.reset(new PJMEDIAEndpoint(t));
+ return mEndpointInstance;
}
PJMEDIAEndpoint::PJMEDIAEndpoint(pjmedia_endpt* endpt) :
diff --git a/src/PJMEDIAEndpoint.h b/src/PJMEDIAEndpoint.h
index 93c614e..674a82d 100644
--- a/src/PJMEDIAEndpoint.h
+++ b/src/PJMEDIAEndpoint.h
@@ -18,6 +18,7 @@
#include "PJMEDIAEnvironment.h"
#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
//
// forward declarations.
@@ -43,13 +44,16 @@ public:
return mEndpoint;
}
- static PJMEDIAEndpointPtr create(const PJMEDIAEnvironmentPtr& environ);
+ static PJMEDIAEndpointPtr get(const PJMEDIAEnvironmentPtr& environ);
private:
pjmedia_endpt* mEndpoint;
PJMEDIAEndpoint(pjmedia_endpt* endpoint);
+ static boost::mutex mEndpointMutex;
+ static PJMEDIAEndpointPtr mEndpointInstance;
+
//
// Hidden and unimplemented.
//
diff --git a/src/PJMEDIAEnvironment.cpp b/src/PJMEDIAEnvironment.cpp
index 2983073..89f7059 100644
--- a/src/PJMEDIAEnvironment.cpp
+++ b/src/PJMEDIAEnvironment.cpp
@@ -27,6 +27,14 @@ using namespace AsteriskSCF::PJMEDIARTP;
using namespace AsteriskSCF::System::V1;
using namespace AsteriskSCF::PJUtil;
+PJMEDIAEnvironment::~PJMEDIAEnvironment()
+{
+ if (mIOQueue)
+ {
+ pj_ioqueue_destroy(mIOQueue);
+ }
+}
+
//
// The main work of creating the various objects is done by the factory, not the PJMEDIAEnvironment constructor.
//
@@ -40,7 +48,8 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
const RTPConfigurationPtr& configObject) :
mPJLIBConfig(libCfg),
mConfiguration(configObject),
- mCachingPool(new pj_caching_pool)
+ mCachingPool(new pj_caching_pool),
+ mIOQueue(0)
{
//
// I find this practice a little sketchy since the pointers that might be retrieve through the accessors *must*
@@ -53,4 +62,5 @@ PJMEDIAEnvironment::PJMEDIAEnvironment(const PJLIBConfigurationPtr& libCfg,
// TODO: should these values come from configuration.
//
mMemoryPool = pj_pool_create(mPoolFactory, "media_rtp_pjmedia", 1024, 1024, 0);
+ pj_status_t ioqueueStatus = pj_ioqueue_create(mMemoryPool, PJ_IOQUEUE_MAX_HANDLES, &mIOQueue);
}
diff --git a/src/PJMEDIAEnvironment.h b/src/PJMEDIAEnvironment.h
index 67e604a..418d48e 100644
--- a/src/PJMEDIAEnvironment.h
+++ b/src/PJMEDIAEnvironment.h
@@ -29,6 +29,7 @@ struct pj_pool_factory;
struct pjmedia_endpt;
struct pj_pool_t;
struct pj_caching_pool;
+struct pj_ioqueue_t;
namespace AsteriskSCF
{
@@ -58,6 +59,7 @@ typedef boost::shared_ptr<PJMEDIAEnvironment> PJMEDIAEnvironmentPtr;
class PJMEDIAEnvironment
{
public:
+ virtual ~PJMEDIAEnvironment();
/**
* Get generic configuration object.
@@ -110,6 +112,11 @@ public:
return mMemoryPool;
}
+ pj_ioqueue_t* ioQueue() const
+ {
+ return mIOQueue;
+ }
+
/**
* Create an instance of the object based on the Ice properties.
*/
@@ -122,10 +129,12 @@ private:
pj_pool_factory* mPoolFactory;
pj_pool_t* mMemoryPool;
+ pj_ioqueue_t* mIOQueue;
boost::shared_ptr<pj_caching_pool> mCachingPool;
PJMEDIAEnvironment(const PJLIBConfigurationPtr& libConfig,
const RTPConfigurationPtr& configObject);
+
};
} /* End of namespace PJMEDIARTP */
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index ce5ba9b..fca1d53 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -515,7 +515,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const ConfigurationServiceImplPtr& configurationService) :
mOperationContextCache(OperationContextCache::create(DEFAULT_TTL_SECONDS)),
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::create(env)),
+ mEndpoint(PJMEDIAEndpoint::get(env)),
mId(id),
mAdapter(adapter),
mFormats(params->formats),
@@ -574,7 +574,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
const RTPReplicationContextPtr& replicationContext,
const ConfigurationServiceImplPtr& configurationService) :
mEnvironment(env),
- mEndpoint(PJMEDIAEndpoint::create(env)),
+ mEndpoint(PJMEDIAEndpoint::get(env)),
mId(sessionIdentity),
mAdapter(adapter),
mFormats(formats),
commit 44e151d1c320b5e37990b0b4fdbe3650dcbd4114
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 10 14:24:14 2012 -0230
Make sure that the structure being handed to the pjlib api for thread
registration is *clean* for the given thread.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index eddd36e..fbae703 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -78,6 +78,11 @@ public:
* pjthread thread description information, must persist for the life of the thread
*/
pj_thread_desc mDesc;
+
+ void reset()
+ {
+ memset(&mDesc, 0, sizeof mDesc);
+ }
};
/**
@@ -115,6 +120,7 @@ public:
if (pj_thread_is_registered() == PJ_FALSE)
{
pj_thread_t *thread;
+ mThreadDescriptor->reset();
pj_status_t status = pj_thread_register("ICE Thread", mThreadDescriptor->mDesc, &thread);
assert(status == PJ_SUCCESS);
}
commit 15a6cae74e650fdeec8945b283f5205410b7481f
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 10 14:02:11 2012 -0230
Add timer and timer task to the cleanup process.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index c4900d9..eddd36e 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -105,6 +105,13 @@ public:
void runTimerTask()
{
+ if (isShutdown())
+ {
+ mSessionAdapter.reset();
+ mTransport.reset();
+ return;
+ }
+
if (pj_thread_is_registered() == PJ_FALSE)
{
pj_thread_t *thread;
@@ -115,13 +122,6 @@ public:
void *packet;
int packet_size;
- if (isShutdown())
- {
- mSessionAdapter.reset();
- mTransport.reset();
- return;
- }
-
pjmedia_rtcp_build_rtcp(mSessionAdapter->getRtcpSession(), &packet, &packet_size);
pjmedia_transport_send_rtcp(mTransport->getTransport(), packet, packet_size);
@@ -244,6 +244,8 @@ public:
*/
RTPTelephonyEventSourcePtr mTelephonyEventSource;
+ RtcpTransmissionPtr mTransmission;
+
/**
* Lock that protects information contained.
*/
@@ -474,12 +476,16 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
{
TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, true);
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
{
- RtcpTransmissionPtr transmission;
- if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport, mImpl->mThreadDescriptor)))
+ if (mImpl->mTransmission)
+ {
+ mImpl->mTransmission->shutdown();
+ }
+ if ((mImpl->mTransmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport, mImpl->mThreadDescriptor)))
{
- mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
+ mImpl->mTimer->scheduleRepeated(mImpl->mTransmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
}
}
}
@@ -557,6 +563,10 @@ void StreamSourceRTPImpl::destroy()
mImpl->mTimer->destroy();
mImpl->mTimer = 0;
}
+ if (mImpl->mTransmission)
+ {
+ mImpl->mTransmission->shutdown();
+ }
}
TransportMap::instance().removeTransport(MapRecord(transport, this));
}
commit bf9ec7cdc51731b1f13db3421c7ae7f6a20831aa
Author: Brent Eagles <beagles at digium.com>
Date: Tue Jul 10 12:20:48 2012 -0230
Altering the concrete transport implementations to own an reference count to
their parent endpoint. It is cleaner, or at least more clear, for the servant
to lose its reference to the endpoint when it is destroyed along with references
to everything else. However, the order of destruction is unclear and the transport
objects that might still be active still need it to exist. The transports having
their own reference resolves this issue. The endpoint does not have a reference to them
so there is no circular reference issue and as long as the transport is cleaned up
properly (which it should be at the moment), the endpoint will disappear as well.
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
index b36c996..36e91a1 100644
--- a/src/ICETransport.cpp
+++ b/src/ICETransport.cpp
@@ -803,8 +803,7 @@ ICETransportPtr ICETransport::create(const PJMEDIAEndpointPtr& ep, const PJMEDIA
}
ICETransport::ICETransport(const PJMEDIAEndpointPtr& ep, const PJMEDIAEnvironmentPtr& configObject) :
- PJMEDIATransport(0),
- mEndpoint(ep),
+ PJMEDIATransport(ep, 0),
mConfig(configObject),
mEnableRTCP(false)
{
diff --git a/src/ICETransport.h b/src/ICETransport.h
index 7a30cca..31d4060 100644
--- a/src/ICETransport.h
+++ b/src/ICETransport.h
@@ -70,7 +70,6 @@ private:
PJICECallbackPtr mCallback;
PJSockAddrPtr mLastKnownAddr;
NATModulePtr mNATModule;
- PJMEDIAEndpointPtr mEndpoint;
PJMEDIAEnvironmentPtr mConfig;
bool mEnableRTCP;
diff --git a/src/PJMEDIATransport.cpp b/src/PJMEDIATransport.cpp
index 2546b8b..4b1e9b8 100644
--- a/src/PJMEDIATransport.cpp
+++ b/src/PJMEDIATransport.cpp
@@ -36,8 +36,9 @@ pjmedia_transport* PJMEDIATransport::getTransport() const
return mTransport;
}
-PJMEDIATransport::PJMEDIATransport(pjmedia_transport* t) :
- mTransport(t)
+PJMEDIATransport::PJMEDIATransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t) :
+ mTransport(t),
+ mEndpoint(endpoint)
{
}
diff --git a/src/PJMEDIATransport.h b/src/PJMEDIATransport.h
index ca4889f..e6e0bcf 100644
--- a/src/PJMEDIATransport.h
+++ b/src/PJMEDIATransport.h
@@ -34,6 +34,8 @@ namespace PJMEDIARTP
class PJMEDIATransport;
typedef boost::shared_ptr<PJMEDIATransport> PJMEDIATransportPtr;
+class PJMEDIAEndpoint;
+typedef boost::shared_ptr<PJMEDIAEndpoint> PJMEDIAEndpointPtr;
class PJMEDIATransport
{
@@ -57,8 +59,9 @@ public:
protected:
pjmedia_transport* mTransport;
+ PJMEDIAEndpointPtr mEndpoint;
- PJMEDIATransport(pjmedia_transport* t);
+ PJMEDIATransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t);
AsteriskSCF::Helpers::AddressPtr getLocalAddressImpl();
AsteriskSCF::Helpers::AddressPtr fromInfo(pjmedia_transport_info& info);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 5c9e3bc..ce5ba9b 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1205,6 +1205,7 @@ RTPSessionPrx RTPSessionImpl::activate(
mStreamSink->getTelephonyEventSinkStateItem(),
mStreamSource->getTelephonyEventSourceStateItem());
}
+
RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
mTransport->addFacets(mAdapter, id);
return result;
@@ -1232,9 +1233,7 @@ void RTPSessionImpl::destroy()
mAdapter->remove(mTelephonyEventSinkPrx->ice_getIdentity());
}
- /* Since both the source and sink have a pointer back to the session we need to get rid of them,
- * which will in turn get rid of ourselves once we are removed from the ASM.
- */
+ PJMEDIAEndpointPtr tempEndpoint;
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
if (mStreamSource)
@@ -1253,8 +1252,13 @@ void RTPSessionImpl::destroy()
mSessionAdapter.reset();
mTransport.reset();
mRtcpSessionInterface = 0;
+ tempEndpoint = mEndpoint;
mEndpoint.reset();
}
+ //
+ // The session's copy of the endpoint needs to be cleaned up here.
+ //
+ tempEndpoint.reset();
/* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
* destruct and then cleanup will occur.
diff --git a/src/SRTPTransport.cpp b/src/SRTPTransport.cpp
index c646a07..6a0230f 100644
--- a/src/SRTPTransport.cpp
+++ b/src/SRTPTransport.cpp
@@ -97,11 +97,11 @@ SRTPTransportPtr SRTPTransport::create(const PJMEDIATransportPtr& transport, con
{
throw InternalInitializationException("Unable to create new ICE media transport");
}
- return SRTPTransportPtr(new SRTPTransport(t, transport));
+ return SRTPTransportPtr(new SRTPTransport(ep, t, transport));
}
-SRTPTransport::SRTPTransport(pjmedia_transport* t, const PJMEDIATransportPtr& origTransport) :
- PJMEDIATransport(t),
+SRTPTransport::SRTPTransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t, const PJMEDIATransportPtr& origTransport) :
+ PJMEDIATransport(endpoint, t),
mMainTransport(origTransport),
mEnableAuthentication(true),
mEnableEncryption(true),
diff --git a/src/SRTPTransport.h b/src/SRTPTransport.h
index 9e3c25a..1411a63 100644
--- a/src/SRTPTransport.h
+++ b/src/SRTPTransport.h
@@ -55,7 +55,7 @@ private:
bool mEnableEncryption;
bool mStarted;
- SRTPTransport(pjmedia_transport* t, const PJMEDIATransportPtr& origTransport);
+ SRTPTransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t, const PJMEDIATransportPtr& origTransport);
//
// Hidden and unimplemented.
diff --git a/src/UDPTransport.cpp b/src/UDPTransport.cpp
index 605e613..27a28ad 100644
--- a/src/UDPTransport.cpp
+++ b/src/UDPTransport.cpp
@@ -75,13 +75,13 @@ UDPTransportPtr AsteriskSCF::PJMEDIARTP::UDPTransport::createImpl(const PJMEDIAE
port, 0, &transport);
if (success(result))
{
- return UDPTransportPtr(new UDPTransport(transport));
+ return UDPTransportPtr(new UDPTransport(ep, transport));
}
}
throw InternalInitializationException("Unable to initialize UDP media transport");
}
-UDPTransport::UDPTransport(pjmedia_transport* t) :
- PJMEDIATransport(t)
+UDPTransport::UDPTransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t) :
+ PJMEDIATransport(endpoint, t)
{
}
diff --git a/src/UDPTransport.h b/src/UDPTransport.h
index 09cd779..86c61b4 100644
--- a/src/UDPTransport.h
+++ b/src/UDPTransport.h
@@ -61,7 +61,7 @@ private:
const RTPConfigurationPtr& configObject,
unsigned minPort, unsigned maxPort, bool expectIPv6);
- UDPTransport(pjmedia_transport* t);
+ UDPTransport(const PJMEDIAEndpointPtr& endpoint, pjmedia_transport* t);
};
} /* End of namespace PJMEDIARTP */
commit c89be9b7c37e4f61b365d8e620b6dadf297462f8
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 19:29:09 2012 -0230
Transport/endpoint order is pretty important. Fixed very basic crash.
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 0e389a0..5c9e3bc 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1237,15 +1237,23 @@ void RTPSessionImpl::destroy()
*/
{
boost::unique_lock<boost::shared_mutex> lock(mLock);
- mStreamSource->destroy();
- mStreamSink->destroy();
+ if (mStreamSource)
+ {
+ mStreamSource->destroy();
+ }
+ if (mStreamSink)
+ {
+ mStreamSink->destroy();
+ }
mStreamSource = 0;
mStreamSink = 0;
+ mReceiverReport = 0;
+ mSenderReport = 0;
mSessionAdapter.reset();
- mEndpoint.reset();
mTransport.reset();
mRtcpSessionInterface = 0;
+ mEndpoint.reset();
}
/* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
commit 696c728ff5cd8c3d3659e33ed492af68b4ae07ad
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 19:13:13 2012 -0230
Force an early cleanup of the endpoint. This should cleanup threads, pools etc for the session.
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index e0c26cc..0e389a0 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -1243,6 +1243,7 @@ void RTPSessionImpl::destroy()
mStreamSink = 0;
mSessionAdapter.reset();
+ mEndpoint.reset();
mTransport.reset();
mRtcpSessionInterface = 0;
}
commit d5146877151aaf690908b8710a030acf18a557d9
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 18:29:29 2012 -0230
Enable the active transport count logging for all build types.
diff --git a/src/TransportMap.cpp b/src/TransportMap.cpp
index e8fc5f5..02bb415 100755
--- a/src/TransportMap.cpp
+++ b/src/TransportMap.cpp
@@ -185,27 +185,20 @@ TransportMap& TransportMap::instance()
TransportMap TransportMap::mInstance;
TransportMap::TransportMap()
- : mTable(65)
-#ifndef _NDEBUG
- , mCounter(0)
-#endif
+ : mTable(65), mCounter(0)
{
}
void TransportMap::incEntries()
{
-#ifndef _NDEBUG
++mCounter;
lg(Debug) << "Transport added: " << mCounter << " active transports";
-#endif
}
void TransportMap::decEntries()
{
-#ifndef _NDEBUG
--mCounter;
lg(Debug) << "Transport removed: " << mCounter << " active transports";
-#endif
}
MapRecord TransportMap::get(const Ice::Int port)
diff --git a/src/TransportMap.h b/src/TransportMap.h
index bcbf5a7..46998b2 100755
--- a/src/TransportMap.h
+++ b/src/TransportMap.h
@@ -64,9 +64,7 @@ private:
boost::shared_mutex mLock;
TransportTable mTable;
-#ifndef _NDEBUG
unsigned long mCounter;
-#endif
static TransportMap mInstance;
commit 33d85fe523555ae4ab637ca62f09da3b6b8a91c7
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 18:03:31 2012 -0230
Fixed some deadlock conditions that occur pretty readily in live testing.
diff --git a/src/TransportMap.cpp b/src/TransportMap.cpp
index 07d5045..e8fc5f5 100755
--- a/src/TransportMap.cpp
+++ b/src/TransportMap.cpp
@@ -65,7 +65,7 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
{
assert(r.transport);
assert(r.source);
- boost::unique_lock<boost::shared_mutex> lock(mLock);
+ MapRecord oldRecord;
AddressPtr address = r.transport->localAddress();
assert(address);
unsigned tblIndex = address->port() / 1000;
@@ -82,15 +82,25 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
}
- //
- // Lazy initialization of port subtables.
- //
- if (mTable[tblIndex].size() < (arrayIndex + 1))
{
- mTable[tblIndex].resize(arrayIndex + 1);
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
+ //
+ // Lazy initialization of port subtables.
+ //
+ if (mTable[tblIndex].size() < (arrayIndex + 1))
+ {
+ mTable[tblIndex].resize(arrayIndex + 1);
+ }
+
+ oldRecord = mTable[tblIndex][arrayIndex];
+ mTable[tblIndex][arrayIndex] = r;
+ if (oldRecord.transport == 0)
+ {
+ incEntries();
+ }
}
- MapRecord oldRecord = mTable[tblIndex][arrayIndex];
if (oldRecord.transport != 0)
{
lg(Debug) << "Updating existing transport entry";
@@ -102,7 +112,6 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
}
}
- mTable[tblIndex][arrayIndex] = r;
pj_status_t result;
if (rtcp)
{
@@ -118,6 +127,8 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
}
if (result != PJ_SUCCESS)
{
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+
//
// Do we revert the system or simply set to a sane initial
// state? We cannot really re-attach because we don't have all
@@ -125,39 +136,42 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
// state for now.
//
mTable[tblIndex][arrayIndex] = MapRecord();
+ decEntries();
throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
}
- if (oldRecord.transport == 0)
- {
- incEntries();
- }
}
void TransportMap::removeTransport(const MapRecord& r)
{
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- AddressPtr address = r.transport->localAddress();
- assert(address);
- unsigned tblIndex = address->port() / 1000;
- unsigned arrayIndex = address->port() % 1000;
- if (mTable[tblIndex].size() < (arrayIndex + 1) || mTable[tblIndex][arrayIndex].transport == 0)
+ MapRecord toRemove;
+ int port;
{
- lg(Debug) << "Attempting to remove a non-existant transport";
- return;
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ AddressPtr address = r.transport->localAddress();
+ assert(address);
+ port = address->port();
+ unsigned tblIndex = port / 1000;
+ unsigned arrayIndex = port % 1000;
+ if (mTable[tblIndex].size() < (arrayIndex + 1) || mTable[tblIndex][arrayIndex].transport == 0)
+ {
+ lg(Debug) << "Attempting to remove a non-existent transport";
+ return;
+ }
+ toRemove = mTable[tblIndex][arrayIndex];
+ mTable[tblIndex][arrayIndex] = MapRecord();
+ decEntries();
}
- MapRecord toRemove = mTable[tblIndex][arrayIndex];
+
if (toRemove.transport != 0)
{
pjmedia_transport* t = toRemove.transport->getTransport();
if (t)
{
- pjmedia_transport_detach(t, reinterpret_cast<void*>(address->port()));
+ pjmedia_transport_detach(t, reinterpret_cast<void*>(port));
}
}
-
- mTable[tblIndex][arrayIndex] = MapRecord();
- decEntries();
+
}
TransportMap& TransportMap::instance()
commit f0e6255ca9fe653b034055d82df3cd48319ad3dc
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 17:30:47 2012 -0230
Fixed several silly mistakes with rtp/rtcp setup.
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 5bacf4b..c4900d9 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -464,7 +464,7 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
*/
void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
{
- TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, false);
}
@@ -473,7 +473,15 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
*/
void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
{
- TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port, true);
+ if (!mImpl->mTimer && (mImpl->mTimer = new IceUtil::Timer()))
+ {
+ RtcpTransmissionPtr transmission;
+ if ((transmission = new RtcpTransmission(mImpl->mSessionAdapter, mImpl->mSink, mImpl->mTransport, mImpl->mThreadDescriptor)))
+ {
+ mImpl->mTimer->scheduleRepeated(transmission, IceUtil::Time::milliSeconds(PJMEDIA_RTCP_INTERVAL));
+ }
+ }
}
/**
@@ -683,6 +691,7 @@ bool StreamSourceRTPImpl::receiveRTP(void *userdata, void *packet, size_t size)
lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
}
}
+ return true;
}
/**
@@ -714,4 +723,5 @@ bool StreamSourceRTPImpl::receiveRTCP(void *userdata, void *packet, size_t size)
{
(*listener)->sourceStatisticsUpdated(AsteriskSCF::Operations::createContext(), mImpl->mSource, statistics);
}
+ return true;
}
diff --git a/src/TransportMap.cpp b/src/TransportMap.cpp
index 923e344..07d5045 100755
--- a/src/TransportMap.cpp
+++ b/src/TransportMap.cpp
@@ -61,7 +61,7 @@ bool isCompatible(const pj_sockaddr& addr, const pjmedia_transport_info& info)
}
-void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int port)
+void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int port, bool rtcp)
{
assert(r.transport);
assert(r.source);
@@ -85,33 +85,37 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
//
// Lazy initialization of port subtables.
//
- if (mTable[tblIndex].size() < arrayIndex)
+ if (mTable[tblIndex].size() < (arrayIndex + 1))
{
- mTable[tblIndex].resize(arrayIndex);
+ mTable[tblIndex].resize(arrayIndex + 1);
}
MapRecord oldRecord = mTable[tblIndex][arrayIndex];
- if (oldRecord.transport)
+ if (oldRecord.transport != 0)
{
- if (oldRecord.transport == r.transport && oldRecord.source == r.source && port == address->port())
+ lg(Debug) << "Updating existing transport entry";
+ AddressPtr oldAddr = oldRecord.transport->localAddress();
+ if (oldAddr)
{
- //
- // We do not have to do anything here because doing what
- // comes next will not change anything. This can happen if
- // setRtcpRemoteDetails is called in addition to
- // setRemoteDetails.
- //
- return;
+ pjmedia_transport* t = oldRecord.transport->getTransport();
+ pjmedia_transport_detach(t, reinterpret_cast<void*>(oldAddr->port()));
}
- lg(Debug) << "Overwriting existing transport entry and detaching transport.";
- pjmedia_transport* t = oldRecord.transport->getTransport();
- pjmedia_transport_detach(t, reinterpret_cast<void*>(port));
}
mTable[tblIndex][arrayIndex] = r;
- pj_status_t result = pjmedia_transport_attach(r.transport->getTransport(),
- reinterpret_cast<void*>(port), &addr, &info.src_rtcp_name, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
- &TransportMap::receiveRTCP);
+ pj_status_t result;
+ if (rtcp)
+ {
+ result = pjmedia_transport_attach(r.transport->getTransport(),
+ reinterpret_cast<void*>(address->port()), &info.src_rtp_name, &addr, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
+ &TransportMap::receiveRTCP);
+ }
+ else
+ {
+ result = pjmedia_transport_attach(r.transport->getTransport(),
+ reinterpret_cast<void*>(address->port()), &addr, &info.src_rtcp_name, pj_sockaddr_get_len(&addr), &TransportMap::receiveRTP,
+ &TransportMap::receiveRTCP);
+ }
if (result != PJ_SUCCESS)
{
//
@@ -123,8 +127,11 @@ void TransportMap::addTransport(const MapRecord& r, const string& ip, Ice::Int p
mTable[tblIndex][arrayIndex] = MapRecord();
throw AsteriskSCF::Media::RTP::V1::InvalidAddress();
}
-
- incEntries();
+
+ if (oldRecord.transport == 0)
+ {
+ incEntries();
+ }
}
void TransportMap::removeTransport(const MapRecord& r)
@@ -134,13 +141,13 @@ void TransportMap::removeTransport(const MapRecord& r)
assert(address);
unsigned tblIndex = address->port() / 1000;
unsigned arrayIndex = address->port() % 1000;
- if (mTable[tblIndex].size() < arrayIndex || !mTable[tblIndex][arrayIndex].transport)
+ if (mTable[tblIndex].size() < (arrayIndex + 1) || mTable[tblIndex][arrayIndex].transport == 0)
{
lg(Debug) << "Attempting to remove a non-existant transport";
return;
}
MapRecord toRemove = mTable[tblIndex][arrayIndex];
- if (toRemove.transport)
+ if (toRemove.transport != 0)
{
pjmedia_transport* t = toRemove.transport->getTransport();
if (t)
@@ -164,8 +171,9 @@ TransportMap& TransportMap::instance()
TransportMap TransportMap::mInstance;
TransportMap::TransportMap()
+ : mTable(65)
#ifndef _NDEBUG
- : mCounter(0)
+ , mCounter(0)
#endif
{
}
@@ -196,7 +204,7 @@ MapRecord TransportMap::get(const Ice::Int port)
// Bounds check
//
assert(tblIndex >= 0 && tblIndex <= mTable.size());
- if (mTable[tblIndex].empty() || mTable[tblIndex].size() < arrayIndex)
+ if (mTable[tblIndex].size() < (arrayIndex + 1))
{
throw TransportNotFound();
}
@@ -218,7 +226,7 @@ void TransportMap::receiveRTP(void* userdata, void* packet, pj_ssize_t size)
return;
}
- int portKey = reinterpret_cast<int>(userdata);
+ long portKey = reinterpret_cast<long>(userdata);
try
{
MapRecord transportRec = TransportMap::instance().get(portKey);
@@ -242,7 +250,7 @@ void TransportMap::receiveRTCP(void* userdata, void* packet, pj_ssize_t size)
return;
}
- int portKey = reinterpret_cast<int>(userdata);
+ long portKey = reinterpret_cast<long>(userdata);
try
{
MapRecord transportRec = TransportMap::instance().get(portKey);
diff --git a/src/TransportMap.h b/src/TransportMap.h
index dd5b9b3..bcbf5a7 100755
--- a/src/TransportMap.h
+++ b/src/TransportMap.h
@@ -19,7 +19,6 @@
#include "PJMEDIATransport.h"
#include "RTPSource.h"
#include <vector>
-#include <array>
#include <boost/thread.hpp>
#include <pjlib.h>
@@ -48,7 +47,7 @@ class TransportMap
{
public:
- void addTransport(const MapRecord& transport, const std::string& address, Ice::Int port);
+ void addTransport(const MapRecord& transport, const std::string& address, Ice::Int port, bool rtcp);
void removeTransport(const MapRecord& transport);
static TransportMap& instance();
@@ -61,7 +60,7 @@ private:
// that an RTP component that loads up will likely load up again!
//
typedef std::vector<MapRecord> TransportArray;
- typedef std::array<TransportArray, 65> TransportTable;
+ typedef std::vector<TransportArray> TransportTable;
boost::shared_mutex mLock;
TransportTable mTable;
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 6419848..5fe90ae 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -412,6 +412,11 @@ BOOST_AUTO_TEST_CASE(RTPSessionRetry)
BOOST_CHECK_EQUAL(expected, actual);
expected->release();
}
+ catch (const std::exception& ex)
+ {
+ expected->release();
+ BOOST_FAIL(ex.what());
+ }
catch(...)
{
expected->release();
commit 275d7d2461aee6c129f4f4d5e5bfa2e86b93166d
Author: Brent Eagles <beagles at digium.com>
Date: Mon Jul 9 15:52:06 2012 -0230
Add a TransportMap helper class to help cleanup the RTPSession shutdown code.
diff --git a/config/RTPConfigurator.py b/config/RTPConfigurator.py
old mode 100644
new mode 100755
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 06d14a3..265a3be 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -44,6 +44,8 @@ astscf_component_add_files(MediaRTPPJMEDIA SRTPConfiguration.cpp)
astscf_component_add_files(MediaRTPPJMEDIA SRTPConfiguration.h)
astscf_component_add_files(MediaRTPPJMEDIA SRTPTransport.cpp)
astscf_component_add_files(MediaRTPPJMEDIA SRTPTransport.h)
+astscf_component_add_files(MediaRTPPJMEDIA TransportMap.cpp)
+astscf_component_add_files(MediaRTPPJMEDIA TransportMap.h)
astscf_component_add_slices(MediaRTPPJMEDIA PROJECT AsteriskSCF/Replication/MediaRTPPJMEDIA/RTPStateReplicationIf.ice)
astscf_component_add_slices(MediaRTPPJMEDIA PROJECT AsteriskSCF/Configuration/MediaRTPPJMEDIA/RTPConfigurationIf.ice)
astscf_component_add_boost_libraries(MediaRTPPJMEDIA core thread)
diff --git a/src/Configuration.h b/src/Configuration.h
old mode 100644
new mode 100755
diff --git a/src/PJMEDIATransport.cpp b/src/PJMEDIATransport.cpp
index c0047b4..2546b8b 100644
--- a/src/PJMEDIATransport.cpp
+++ b/src/PJMEDIATransport.cpp
@@ -65,22 +65,6 @@ AddressPtr PJMEDIATransport::remoteAddress()
return fromInfo(info);
}
-void PJMEDIATransport::attachLabel(const std::string& label)
-{
- boost::shared_ptr<char> data(new char[label.size() + 1]);
- strcpy(data.get(), label.c_str());
- if (mLabelData)
- {
- pjmedia_transport_detach(mTransport, mLabelData.get());
- }
- mLabelData = data;
-}
-
-boost::shared_ptr<char> PJMEDIATransport::getLabelData()
-{
- return mLabelData;
-}
-
AddressPtr PJMEDIATransport::getLocalAddressImpl()
{
pjmedia_transport_info info;
diff --git a/src/PJMEDIATransport.h b/src/PJMEDIATransport.h
index f4754f5..ca4889f 100644
--- a/src/PJMEDIATransport.h
+++ b/src/PJMEDIATransport.h
@@ -54,13 +54,9 @@ public:
* down the objects with lots of unnecessary code.
**/
virtual void addFacets(const Ice::ObjectAdapterPtr&, const Ice::Identity&) {}
-
- void attachLabel(const std::string& label);
- boost::shared_ptr<char> getLabelData();
protected:
pjmedia_transport* mTransport;
- boost::shared_ptr<char> mLabelData;
PJMEDIATransport(pjmedia_transport* t);
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 2221b5e..e0c26cc 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -780,8 +780,6 @@ void RTPSessionImpl::release(const Ice::Current&)
{
// Remove everything from the state replicator if present
removeState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
-
-
destroy();
}
@@ -1209,10 +1207,6 @@ RTPSessionPrx RTPSessionImpl::activate(
}
RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
mTransport->addFacets(mAdapter, id);
- //
- // The source is an active element and must be explicitly "turned on"
- //
- mStreamSource->activate();
return result;
}
catch (...)
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index a53022b..5bacf4b 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -44,6 +44,8 @@
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/Operations/OperationContext.h>
+#include "TransportMap.h"
+
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Media;
@@ -64,7 +66,6 @@ using namespace AsteriskSCF::SessionCommunications::V1;
namespace
{
Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
-
}
/**
@@ -84,7 +85,6 @@ public:
*/
typedef boost::shared_ptr<ThreadDescWrapper> ThreadDescWrapperPtr;
-
/**
* TimerTask implementation which sends RTCP at a defined interval.
*/
@@ -263,49 +263,6 @@ public:
typedef boost::shared_ptr<StreamSourceRTPImplPriv> StreamSourceRTPImplPrivPtr;
/**
- * A helper class to help keep track of classes that are associated with a
- * media session without having memory management headaches of associating
- * pointers to live objects with pjmedia transports.
- */
-class StreamTable
-{
-public:
- void add(const std::string& lbl, const StreamSourceRTPImplPrivPtr& info)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- InfoMap::const_iterator iter = mMap.find(lbl);
- if (iter == mMap.end())
- {
- mMap[lbl] = info;
- }
- }
-
- void remove(const std::string& lbl)
- {
- boost::unique_lock<boost::shared_mutex> lock(mLock);
- mMap.erase(lbl);
- }
-
- StreamSourceRTPImplPrivPtr get(const std::string& lbl)
- {
- boost::shared_lock<boost::shared_mutex> lock(mLock);
- InfoMap::const_iterator iter = mMap.find(lbl);
- if (iter != mMap.end())
- {
- return iter->second;
- }
- return StreamSourceRTPImplPrivPtr();
- }
-public:
- boost::shared_mutex mLock;
-
- typedef std::map<std::string, StreamSourceRTPImplPrivPtr> InfoMap;
- InfoMap mMap;
-};
-
-StreamTable streamTable;
-
-/**
* Constructor for the StreamSourceRTPImplPriv class.
*/
StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session,
@@ -344,8 +301,7 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session,
const PJMEDIATransportPtr& transport, const string& sessionId,
const StreamSourceRTPPrx& source,
const StreamSinkRTPPrx& sink) :
- mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink)),
- mLabelBuffer(0)
+ mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId, source, sink))
{
}
@@ -502,29 +458,106 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
return 0;
}
+
/**
- * Function which is called when RTP media is received.
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
*/
-static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
+void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
{
- /* Ensure that no errors occurred when reading this packet in */
- if (size < 0)
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+}
+
+
+/**
+ * API call which sets up our pjmedia transport and allows media to be sent and received.
+ */
+void StreamSourceRTPImpl::setRemoteRtcpDetails(const std::string& address, Ice::Int port)
+{
+ TransportMap::instance().addTransport(MapRecord(mImpl->mTransport, this), address, port);
+}
+
+/**
+ * API call which returns a pointer to the source state item.
+ */
+RTPStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ return mImpl->mSourceStateItem;
+}
+
+RTPTelephonyEventSourceStateItemPtr StreamSourceRTPImpl::getTelephonyEventSourceStateItem()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed && mImpl->mTelephonyEventSource)
{
- lg(Error) << "We attempted to read data from an RTP session but failed.";
- return;
+ return mImpl->mTelephonyEventSource->getStateItem();
}
+ return 0;
+}
- const char* infoLabel = static_cast<const char*>(userdata);
- if (!infoLabel)
+void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
+{
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (!mImpl->mDestroyed)
{
- lg(Error) << "Attempted to process an RTP packet with no associated session information.";
- return;
+ mImpl->mSourceStateItem->sinks = sinks;
}
- StreamSourceRTPImplPrivPtr impl = streamTable.get(infoLabel);
- if (!impl)
+}
+
+FramePayloadPtr StreamSourceRTPImplPriv::decodeAudioPayload(const Ice::ByteSeq& toDecode, const AudioFormatPtr& audioFormat)
+{
+ if (audioFormat->sampleSize == 8)
{
- return;
+ return new ByteSeqPayload(toDecode);
+ }
+ else if (audioFormat->sampleSize == 16)
+ {
+ Ice::ShortSeq shortPayload((Ice::Short*) &toDecode.front(),
+ (Ice::Short*) &toDecode[toDecode.size()]);
+
+ std::transform(shortPayload.begin(), shortPayload.end(),
+ shortPayload.begin(), boost::asio::detail::socket_ops::network_to_host_short);
+
+ return new ShortSeqPayload(shortPayload);
+ }
+ return 0;
+}
+
+FramePayloadPtr StreamSourceRTPImplPriv::decodeVideoPayload(const Ice::ByteSeq& toDecode, const VideoFormatPtr&)
+{
+ //Assume for now video payloads use 8-bit samples...
+ return new ByteSeqPayload(toDecode);
+}
+
+void StreamSourceRTPImpl::destroy()
+{
+ PJMEDIATransportPtr transport;
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed)
+ {
+ return;
+ }
+ transport = mImpl->mTransport;
+ mImpl->mDestroyed = true;
+ mImpl->mSourceStateItem = 0;
+ mImpl->mTelephonyEventSource = 0;
+ mImpl->mSessionAdapter.reset();
+ // Destroy the RTCP transmission timer if it exists
+ if (mImpl->mTimer)
+ {
+ mImpl->mTimer->destroy();
+ mImpl->mTimer = 0;
+ }
}
+ TransportMap::instance().removeTransport(MapRecord(transport, this));
+}
+
+/**
+ * Function which is called when RTP media is received.
+ */
+bool StreamSourceRTPImpl::receiveRTP(void *userdata, void *packet, size_t size)
+{
//////////////////////////////////////////////////////////////////////////////////////////////////
// WARNING!
// There is a lot of stuff happening from here down that is done with
@@ -536,30 +569,30 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
Ice::Byte* payload;
unsigned int payload_size;
- boost::shared_lock<boost::shared_mutex> lock(impl->mLock);
- if (impl->mDestroyed)
+ boost::shared_lock<boost::shared_mutex> lock(mImpl->mLock);
+ if (mImpl->mDestroyed)
{
- return;
+ return false;
}
/* We have to cast 'size' to an 'int' here so the compiler won't warn about
* doing it implicitly.
*/
- pj_status_t status = pjmedia_rtp_decode_rtp(&impl->mIncomingSession, packet, (int) size, &header,
+ pj_status_t status = pjmedia_rtp_decode_rtp(&mImpl->mIncomingSession, packet, (int) size, &header,
(const void**)&payload, &payload_size);
if (status != PJ_SUCCESS)
{
lg(Error) << "We read an RTP packet of size " << size << " in but failed to decode it.";
- return;
+ return true;
}
// Update RTP stack information before writing to sinks, it's fine to do it
pjmedia_rtp_status rtpStatus;
- pjmedia_rtp_session_update2(&impl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
+ pjmedia_rtp_session_update2(&mImpl->mIncomingSession, header, &rtpStatus, PJ_FALSE);
// Update RTCP information
- pjmedia_rtcp_rx_rtp2(impl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
+ pjmedia_rtcp_rx_rtp2(mImpl->mSessionAdapter->getRtcpSession(), pj_ntohs(header->seq), pj_ntohl(header->ts),
payload_size, ((rtpStatus.status.value && rtpStatus.status.flag.bad) || !payload_size)
? PJ_TRUE : PJ_FALSE);
@@ -567,29 +600,29 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
if (rtpStatus.status.value && rtpStatus.status.flag.badssrc)
{
std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx> listeners =
- impl->mSessionAdapter->getReceiverReportListeners();
+ mImpl->mSessionAdapter->getReceiverReportListeners();
for (std::vector<AsteriskSCF::Media::RTCP::V1::InformationListenerPrx>::const_iterator listener = listeners.begin();
listener != listeners.end();
++listener)
{
- (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), impl->mSource,
- impl->mIncomingSession.peer_ssrc);
+ (*listener)->sourceSsrcChanged(AsteriskSCF::Operations::createContext(), mImpl->mSource,
+ mImpl->mIncomingSession.peer_ssrc);
}
}
- if (impl->mSourceStateItem->sinks.empty())
+ if (mImpl->mSourceStateItem->sinks.empty())
{
// No sinks present so frames can not go anywhere
- return;
+ return true;
}
- FormatPtr mediaformat = impl->mSessionAdapter->getFormat(header->pt);
+ FormatPtr mediaformat = mImpl->mSessionAdapter->getFormat(header->pt);
if (!mediaformat)
{
// If this is for a payload we don't know about just drop the frame
- return;
+ return true;
}
FrameSeq frames;
@@ -603,7 +636,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
AudioFramePtr frame = new AudioFrame();
Ice::ByteSeq bytePayload(payload, payload + payload_size);
- frame->payload = impl->decodeAudioPayload(bytePayload, audioformat);
+ frame->payload = mImpl->decodeAudioPayload(bytePayload, audioformat);
// Populate the common data
frame->mediaFormat = mediaformat;
@@ -618,7 +651,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
VideoFramePtr frame = new VideoFrame();
Ice::ByteSeq bytePayload(payload, payload + payload_size);
- frame->payload = impl->decodeVideoPayload(bytePayload, videoformat);
+ frame->payload = mImpl->decodeVideoPayload(bytePayload, videoformat);
frame->mediaFormat = mediaformat;
frame->timestamp = header->ts;
@@ -628,17 +661,17 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
}
else if ((rfc4733 = RFC4733Ptr::dynamicCast(mediaformat)))
{
- impl->mTelephonyEventSource->read(header, payload);
+ mImpl->mTelephonyEventSource->read(header, payload);
}
if (frames.empty())
{
// If the media format ended up being a type we don't understand don't bother writing it out
- return;
+ return true;
}
- for (StreamSinkSeq::iterator sink = impl->mSourceStateItem->sinks.begin();
- sink != impl->mSourceStateItem->sinks.end();
+ for (StreamSinkSeq::iterator sink = mImpl->mSourceStateItem->sinks.begin();
+ sink != mImpl->mSourceStateItem->sinks.end();
++sink)
{
try
@@ -655,244 +688,30 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
/**
* Function which is called when RTCP is received.
*/
-static void receiveRTCP(void *userdata, void *packet, pj_ssize_t size)
+bool StreamSourceRTPImpl::receiveRTCP(void *userdata, void *packet, size_t size)
{
- /* Ensure that no errors occurred when reading this packet in */
- if (size < 0)
- {
- lg(Error) << "We attempted to read data from an RTCP session but failed.";
- return;
- }
-
- const char* infoLabel = static_cast<const char*>(userdata);
- if (!infoLabel)
... 1507 lines suppressed ...
--
asterisk-scf/release/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list