[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "srtp-support" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Fri Jul 15 14:29:41 CDT 2011
branch "srtp-support" has been updated
via e1a98078658030c9716217c849dc2bc960ae05f7 (commit)
via e495e075f2f28738b0d8236dd76b353316240f41 (commit)
from b4ebf6392981bbfbf8d5a31a286feac32cd82903 (commit)
Summary of changes:
.../MediaRTPPJMedia/RtpStateReplicationIf.ice | 4 +-
src/RTPConfiguration.cpp | 4 +-
src/RTPSession.cpp | 4 +-
src/RTPSink.cpp | 9 +-
src/RTPSource.cpp | 136 ++++++++++++++------
src/RTPSource.h | 7 +-
test/TestRTPpjmedia.cpp | 20 ++--
7 files changed, 120 insertions(+), 64 deletions(-)
- Log -----------------------------------------------------------------
commit e1a98078658030c9716217c849dc2bc960ae05f7
Merge: b4ebf63 e495e07
Author: Brent Eagles <beagles at digium.com>
Date: Fri Jul 15 14:52:30 2011 -0230
Merge branch 'master' into srtp-support with resolved conflicts.
diff --cc slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index 202c998,b9b08d7..ff9d0d2
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@@ -78,9 -78,6 +78,9 @@@ module V
string mComparatorId;
};
+ /**
- * TODO: Ice classe members shouldn't have m prefixes.
++ * TODO: Data members in Slice defined classes should not have `m' prefixes.
+ */
class RtpSessionStateItem extends RtpStateItem
{
Ice::Identity mSessionIdentity;
diff --cc src/RTPConfiguration.cpp
index 378b569,5e8ee26..1359fe4
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@@ -14,142 -14,23 +14,142 @@@
* at the top of the source tree.
*/
+#include "RtpConfigurationIf.h"
+#include "RTPConfiguration.h"
+
#include <IceUtil/UUID.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
#include <boost/thread/shared_mutex.hpp>
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-
-#include "RtpConfigurationIf.h"
-#include "RTPConfiguration.h"
-
using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::PJMediaRTP;
+using namespace std;
using namespace AsteriskSCF::Configuration::MediaRTPPJMedia::V1;
-class ConfigurationServiceImplPriv
+/**
+ * Implementation of the configuration service.
+ */
+class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
{
public:
+
+ /**
+ * AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
+ */
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
+ getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
+ getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
+ getConfigurationGroups(const Ice::Current&);
+
+ void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+
+ void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ const Ice::Current&);
+ void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
+ const Ice::Current&);
+
+ /**
+ * AsteriskSCF::PJMediaRTP::RTPConfiguration interface.
+ */
+ int getStartPort();
+ int getEndPort();
+ int getWorkerThreadCount();
+
+ std::string getBindIPv4Address();
+ std::string getBindIPv6Address();
+
+ PJLibConfigurationPtr libConfig() const
+ {
+ return PJLibConfigurationPtr(); // XXX this isn't implemented here at this time.
+ }
+
+ NATConfigPtr natConfig() const
+ {
+ return mNATConfig;
+ }
+
+ ICEConfigurationPtr ICEConfig() const
+ {
+ return mICEConfig;
+ }
+
+ SRTPConfigurationPtr srtpConfig() const
+ {
+ return mSRTPConfig;
+ }
+
+ ConfigurationServicePrx activate(const Ice::ObjectAdapterPtr& objectAdapter, const string& id);
+
+ void replaceConfig(const ICEConfigurationPtr& newConfig)
+ {
+ mICEConfig = newConfig;
+ }
+
+ void replaceConfig(const NATConfigPtr& newConfig)
+ {
+ mNATConfig = newConfig;
+ }
+
+ void replaceConfig(const SRTPConfigurationPtr& newConfig)
+ {
+ mSRTPConfig = newConfig;
+ }
+
+ RtpGeneralGroupPtr getGeneralGroup()
+ {
+ return mGeneralGroup;
+ }
+
+ RTPICEConfigurationGroupPtr getICEConfigurationGroup()
+ {
+ return mICEConfiguration;
+ }
+
+ void replaceGroup(const RtpGeneralGroupPtr& group)
+ {
+ mGeneralGroup = group;
+
+ //
- // There is most likely a lock already held on the configuration data
++ // replaceGroup methods are only called within the scope of an existing lock so we do not need one
++ // here.
+ //
-
+ if (mGeneralGroup)
+ {
+ ConfigurationItemDict::const_iterator item =
+ mGeneralGroup->configurationItems.find(EnableSRTPItemName);
+ if (item != mGeneralGroup->configurationItems.end())
+ {
+ EnableSRTPItemPtr srtpItem = EnableSRTPItemPtr::dynamicCast(item->second);
+ if (!mSRTPConfig)
+ {
+ mSRTPConfig = SRTPConfiguration::create(srtpItem->enabled);
+ }
+ else
+ {
+ //
+ // No point in allocating if it's the same.
+ //Item
+ if (mSRTPConfig->isSRTPEnabled() != srtpItem->enabled)
+ {
+ mSRTPConfig = SRTPConfiguration::create(srtpItem->enabled);
+ }
+ }
+ }
+ }
+ }
+
+ void replaceGroup(const RTPICEConfigurationGroupPtr& group)
+ {
+ mICEConfiguration = group;
+ }
+
+private:
/**
* General RTP configuration
*/
diff --cc src/RTPSession.cpp
index bf5258d,05ce690..3fecc5c
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@@ -42,92 -39,45 +42,92 @@@ using namespace AsteriskSCF::Media::RTP
using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::PJMediaRTP;
/**
- * Default value for where we should start allocating RTP and RTCP ports from.
- */
-#define DEFAULT_RTP_PORT_MINIMUM 10000
-
-/**
- * Default value for where we should stop allocating RTP and RTCP ports.
+ * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
*/
-#define DEFAULT_RTP_PORT_MAXIMUM 20000
-
-/**
- * Private implementation details for the RTPSessionImpl class.
- */
-class RTPSessionImplPriv
+class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
{
public:
- RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
- const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
- mAdapter(adapter), mFormats(formats),
- mSessionStateItem(new RtpSessionStateItem()),
- mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
- ~RTPSessionImplPriv();
+ RTPSessionImpl(const Ice::ObjectAdapterPtr&,
+ const string& id,
+ const PJMediaEnvironmentPtr& env,
+ const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+ const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr&);
+
+ RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
+ const string& sessionIdentity,
+ const PJMediaEnvironmentPtr& env,
+ Ice::Int port,
+ const AsteriskSCF::Media::V1::FormatSeq& formats,
+ bool isIPv6,
+ bool srtp,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr& configurationServant);
+
+ ~RTPSessionImpl();
/**
- * AsteriskSCF::Media::V1::RSession implementation.
- * A pointer to the object adapter that objects should be added to.
++ * AsteriskSCF::Media::V1::Session implementation.
*/
- Ice::ObjectAdapterPtr mAdapter;
+ AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
+ AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
+ std::string getId(const Ice::Current&);
+ void useRTCP(bool, const Ice::Current&);
+ AsteriskSCF::Media::RTP::V1::RTCPSessionPrx getRTCPSession(const Ice::Current&);
+ void release(const Ice::Current&);
+ void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
/**
- * A sequence of formats that the session is expected to carry.
+ * AsteriskSCF::Media::V1::SRTPSession implementation.
*/
- FormatSeq mFormats;
+ void setOptions(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+ void start(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
+
+ /**
+ * Internal methods.
+ */
+ AsteriskSCF::Media::V1::FormatSeq getFormats();
+ void setRemoteDetails(const std::string& address, Ice::Int port);
+ AsteriskSCF::Media::V1::FormatPtr getFormat(int payload);
+ int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
/**
- * A proxy to ourselves.
+ * Accessors for the source and sink servants.
*/
- RTPSessionPrx mProxy;
+ StreamSourceRTPImplPtr getSourceServant();
+ StreamSinkRTPImplPtr getSinkServant();
+
+ void replicateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
+ const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
+ const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
+ void removeState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
+ const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
+ const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
+
+ void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& payloadMap);
+
+ RTPSessionPrx activate(const string& id);
+ RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId, const Ice::Identity& sinkId);
+
+ void destroy();
+
+private:
+
+ boost::shared_mutex mLock;
+
+ /**
+ * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
+ */
+ SessionAdapterPtr mSessionAdapter;
+
+ /**
+ * The current pjmedia related environment.
+ */
+ PJMediaEnvironmentPtr mEnvironment;
/**
* pjmedia endpoint for our media.
@@@ -610,147 -512,6 +610,147 @@@ void RTPSessionImpl::removeState(const
}
catch (...)
{
- mImpl->mStateReplicator->removeState(items);
+ mStateReplicator->removeState(items);
+ }
+}
+
+void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
+{
+ for (PayloadMap::const_iterator it = mappings.begin(); it != mappings.end(); ++it)
+ {
+ mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
+ }
+}
+
+RTPSessionPrx RTPSessionImpl::activate(const string& id)
+{
+ assert(id == mId);
+ Ice::Identity sourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+ Ice::Identity sinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+ return activate(mAdapter->getCommunicator()->stringToIdentity(id), sourceId, sinkId);
+}
+
+RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id, const Ice::Identity& sourceId,
+ const Ice::Identity& sinkId)
+{
+ mSessionAdapter.reset(new SessionAdapterImpl(this));
+ try
+ {
+ mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId);
+ mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mTransport, mId);
+ mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
+ mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
+
+ if (mSessionStateItem)
+ {
+ mSessionStateItem->key = mSessionStateItem->mSessionId = mId;
+ mSessionStateItem->mSessionIdentity = id;
+ mSessionStateItem->mFormats = mFormats;
+ mSessionStateItem->mSourceIdentity = sourceId;
+ mSessionStateItem->mSinkIdentity = sinkId;
+ if (mTransport->localAddress())
+ {
+ mSessionStateItem->mPort = mTransport->localAddress()->port();
+ }
+ else
+ {
+ mSessionStateItem->mPort = 0;
+ }
+ replicateState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
+ }
+ RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
+ mTransport->addFacets(mAdapter, id);
+ return result;
+ }
+ catch (...)
+ {
+ mSessionAdapter.reset();
+ throw;
+ }
+ return RTPSessionPrx(); // For compilers that complain about such things.
+}
+
+void RTPSessionImpl::destroy()
+{
+ /* Drop the source and sink from the ASM */
+ mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
+ mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
+
+ /* Since both the source and sink have a pointer back to the session we need to get rid of them,
+ * which will in turn get rid of ourselves once we are removed from the ASM.
+ */
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mLock);
+ mStreamSource = 0;
+ mStreamSink = 0;
+ mSessionAdapter.reset();
}
+
+ /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
+ * destruct and then cleanup will occur.
+ */
+ mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
}
+
+class ReplicationAdapterImpl : public ReplicationAdapter
+{
+public:
+
+ void update(const RtpSessionStateItemPtr& item)
+ {
+ mImpl->associatePayloadsImpl(item->mPayloadstoFormats);
+ }
+
+ void update(const RtpStreamSinkStateItemPtr& item)
+ {
+ mImpl->getSinkServant()->setSourceImpl(item->mSource);
+ mImpl->getSinkServant()->setRemoteDetailsImpl(item->mRemoteAddress, item->mRemotePort);
+ mImpl->getSourceServant()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort);
+ }
+
+ void update(const RtpStreamSourceStateItemPtr& item)
+ {
- mImpl->getSourceServant()->setSinkImpl(item->mSink);
++ mImpl->getSourceServant()->setSinksImpl(item->mSinks);
+ }
+
+ void destroy()
+ {
+ mImpl->destroy();
+ }
+
+ ReplicationAdapterImpl(const RTPSessionImplPtr& impl) :
+ mImpl(impl)
+ {
+ }
+private:
+
+ RTPSessionImplPtr mImpl;
+};
+
+RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+ const std::string& id, const RTPServiceLocatorParamsPtr& params,
+ const PJMediaEnvironmentPtr& environment,
+ const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+ const ConfigurationServiceImplPtr& configuration)
+{
+ RTPSessionImplPtr servant(new RTPSessionImpl(adapter, id, environment, params,
+ replicaControl, stateReplicator, configuration));
+ return servant->activate(id);
+}
+
+ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+ const PJMediaEnvironmentPtr& environment,
+ const RtpSessionStateItemPtr& item,
+ const ConfigurationServiceImplPtr& configuration)
+{
+ RTPSessionImplPtr servant(new RTPSessionImpl(adapter,
+ adapter->getCommunicator()->identityToString(item->mSessionIdentity),
+ environment,
+ item->mPort, item->mFormats, item->mIPv6, item->mSRTP,
+ AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(),
+ configuration));
+ servant->activate(item->mSessionIdentity, item-> mSourceIdentity, item->mSinkIdentity);
+ return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
+}
+
diff --cc src/RTPSink.cpp
index d1cd962,7c65568..2a54fb3
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@@ -119,7 -108,7 +119,7 @@@ void StreamSinkRTPImpl::write(const Ast
int payload;
// Only allow media formats through that we support
- if ((payload = mImpl->mSessionAdapter->getPayload((*frame)->mediaformat)) < 0)
- if ((payload = mImpl->mSession->getPayload((*frame)->mediaFormat)) < 0)
++ if ((payload = mImpl->mSessionAdapter->getPayload((*frame)->mediaFormat)) < 0)
{
throw UnsupportedMediaFormatException();
}
diff --cc src/RTPSource.cpp
index 9854098,c7c9c58..303ef7d
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@@ -14,11 -14,9 +14,12 @@@
* at the top of the source tree.
*/
+#include "RTPSource.h"
+#include "RtpStateReplicationIf.h"
+
#include <pjlib.h>
#include <pjmedia.h>
+
#include <Ice/Ice.h>
#include <IceUtil/UUID.h>
@@@ -73,7 -70,10 +76,11 @@@ public
*/
RtpStreamSourceStateItemPtr mSourceStateItem;
+ string mSessionId;
+ /**
+ * Lock that protects information contained.
+ */
+ boost::shared_mutex mLock;
};
/**
@@@ -101,13 -96,29 +108,29 @@@ StreamSourceRTPImpl::StreamSourceRTPImp
}
/**
- * Implementation of the setSink method as defined in MediaIf.ice
+ * Implementation of the addSink method as defined in MediaIf.ice
+ */
+ void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ {
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.push_back(sink);
+
- mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
++ mImpl->mSessionAdapter->replicateState(mImpl->mSourceStateItem);
+ }
+
+ /**
+ * Implementation of the removeSink method as defined in MediaIf.ice
*/
- void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
{
- mImpl->mSourceStateItem->mSink = sink;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+
+ mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+ mImpl->mSourceStateItem->mSinks.end(),
+ sink), mImpl->mSourceStateItem->mSinks.end());
- mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+ mImpl->mSessionAdapter->replicateState(mImpl->mSourceStateItem);
}
/**
@@@ -200,40 -212,72 +223,72 @@@ static void receiveRTP(void *userdata,
return;
}
- if (source->mImpl->mSourceStateItem->mSink != 0)
+ // Update RTP stack information before writing to sinks, it's fine to do it
+ pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+
+ if (source->mImpl->mSourceStateItem->mSinks.empty())
+ {
+ // No sinks present so frames can not go anywhere
+ return;
+ }
+
- FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
++ FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
+
+ if (!mediaformat)
{
- FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
+ // If this is for a payload we don't know about just drop the frame
+ return;
+ }
- if (mediaformat != 0)
+ FrameSeq frames;
+
+ AudioFormatPtr audioformat;
+ VideoFormatPtr videoformat;
+
+ if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+ {
+ AudioFramePtr frame = new AudioFrame();
+ frame->mediaFormat = mediaformat;
+
+ // Populate the common data
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+
+ // Copy the payload from the RTP packet into the frame
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+
+ // Into the frames sequence it goes
+ frames.push_back(frame);
+ }
+ else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+ {
+ VideoFramePtr frame = new VideoFrame();
+ frame->mediaFormat = mediaformat;
+ frame->timestamp = header->ts;
+ frame->seqno = header->seq;
+ copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+ frames.push_back(frame);
+ }
+
+ if (frames.empty())
+ {
+ // If the media format ended up being a type we don't understand don't bother writing it out
+ return;
+ }
+
+ boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+
+ for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+ sink != source->mImpl->mSourceStateItem->mSinks.end();
+ ++sink)
+ {
+ try
+ {
+ (*sink)->write(frames);
+ }
+ catch (const Ice::Exception&)
{
- FrameSeq frames;
-
- AudioFormatPtr audioformat;
-
- if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
- {
- AudioFramePtr frame = new AudioFrame();
- frame->mediaformat = mediaformat;
-
- /* Populate the common data */
- frame->timestamp = header->ts;
- frame->seqno = header->seq;
-
- /* Copy the payload from the RTP packet to the frame, yahoo! */
- copy(payload, payload + payload_size, std::back_inserter(frame->payload));
-
- /* Into the sequence it goes, yarrrrrrrrrr matey */
- frames.push_back(frame);
- }
-
- try
- {
- source->mImpl->mSourceStateItem->mSink->write(frames);
- }
- catch (const Ice::Exception&)
- {
- lg(Error) << "Exception caught while attempting to write media to an RTP sink";
- }
+ lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
}
}
@@@ -293,7 -335,12 +346,8 @@@ RtpStreamSourceStateItemPtr StreamSourc
return mImpl->mSourceStateItem;
}
- void StreamSourceRTPImpl::setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy)
-/**
- * API call which sets the sinks.
- */
-void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
++void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
{
- mImpl->mSourceStateItem->mSink = proxy;
+ boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
-
+ mImpl->mSourceStateItem->mSinks = sinks;
}
diff --cc src/RTPSource.h
index 093eb6b,f032f88..f100cd4
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@@ -24,22 -21,20 +24,25 @@@ class StreamSourceRTPImplPriv
class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
{
public:
- StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
+ StreamSourceRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter,
+ const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
+ const std::string& parentSessionId);
+
- void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
- AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+ void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+ AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
std::string getId(const Ice::Current&);
void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
std::string getLocalAddress(const Ice::Current&);
Ice::Int getLocalPort(const Ice::Current&);
+
void setRemoteDetails(const std::string& address, Ice::Int port);
- void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
++ void setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq&);
AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
+ void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
+
/**
* Private implementation data for StreamSourceRTPImpl.
* Note: This is public on purpose so that our RTP callback can access it.
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list