[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "srtp-support" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Jul 15 14:29:41 CDT 2011


branch "srtp-support" has been updated
       via  e1a98078658030c9716217c849dc2bc960ae05f7 (commit)
       via  e495e075f2f28738b0d8236dd76b353316240f41 (commit)
      from  b4ebf6392981bbfbf8d5a31a286feac32cd82903 (commit)

Summary of changes:
 .../MediaRTPPJMedia/RtpStateReplicationIf.ice      |    4 +-
 src/RTPConfiguration.cpp                           |    4 +-
 src/RTPSession.cpp                                 |    4 +-
 src/RTPSink.cpp                                    |    9 +-
 src/RTPSource.cpp                                  |  136 ++++++++++++++------
 src/RTPSource.h                                    |    7 +-
 test/TestRTPpjmedia.cpp                            |   20 ++--
 7 files changed, 120 insertions(+), 64 deletions(-)


- Log -----------------------------------------------------------------
commit e1a98078658030c9716217c849dc2bc960ae05f7
Merge: b4ebf63 e495e07
Author: Brent Eagles <beagles at digium.com>
Date:   Fri Jul 15 14:52:30 2011 -0230

    Merge branch 'master' into srtp-support with resolved conflicts.

diff --cc slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
index 202c998,b9b08d7..ff9d0d2
--- a/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
+++ b/slice/AsteriskSCF/Replication/MediaRTPPJMedia/RtpStateReplicationIf.ice
@@@ -78,9 -78,6 +78,9 @@@ module V
  	string mComparatorId;
      };
  
 +    /**
-      * TODO: Ice classe members shouldn't have m prefixes.
++     * TODO: Data members in Slice defined classes should not have `m' prefixes.
 +     */
      class RtpSessionStateItem extends RtpStateItem
      {
  	Ice::Identity mSessionIdentity;
diff --cc src/RTPConfiguration.cpp
index 378b569,5e8ee26..1359fe4
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@@ -14,142 -14,23 +14,142 @@@
   * at the top of the source tree.
   */
  
 +#include "RtpConfigurationIf.h"
 +#include "RTPConfiguration.h"
 +
  #include <IceUtil/UUID.h>
 +#include <AsteriskSCF/System/Component/ConfigurationIf.h>
  
  #include <boost/thread.hpp>
 -#include <boost/shared_ptr.hpp>
  #include <boost/thread/shared_mutex.hpp>
  
 -#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 -
 -#include "RtpConfigurationIf.h"
 -#include "RTPConfiguration.h"
 -
  using namespace AsteriskSCF::System::Configuration::V1;
 +using namespace AsteriskSCF::PJMediaRTP;
 +using namespace std;
  using namespace AsteriskSCF::Configuration::MediaRTPPJMedia::V1;
  
 -class ConfigurationServiceImplPriv
 +/**
 + * Implementation of the configuration service.
 + */
 +class ConfigurationServiceServant : virtual public ConfigurationServiceImpl
  {
  public:
 +
 +    /**
 +     * AsteriskSCF::System::Configuration::V1 interface. Slice to C++ mapping.
 +     */
 +    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
 +    getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 +    
 +    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
 +    getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 +    
 +    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq
 +    getConfigurationGroups(const Ice::Current&);
 +
 +    void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
 +
 +    void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
 +            const Ice::Current&);
 +    void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&,
 +            const Ice::Current&);
 +
 +    /**
 +     * AsteriskSCF::PJMediaRTP::RTPConfiguration interface.
 +     */
 +    int getStartPort();
 +    int getEndPort();
 +    int getWorkerThreadCount();
 +
 +    std::string getBindIPv4Address();
 +    std::string getBindIPv6Address();
 +
 +    PJLibConfigurationPtr libConfig() const
 +    {
 +        return PJLibConfigurationPtr(); // XXX this isn't implemented here at this time.
 +    }
 +
 +    NATConfigPtr natConfig() const
 +    {
 +        return mNATConfig;
 +    }
 +
 +    ICEConfigurationPtr ICEConfig() const
 +    {
 +        return mICEConfig;
 +    }
 +
 +    SRTPConfigurationPtr srtpConfig() const
 +    {
 +        return mSRTPConfig;
 +    }
 +
 +    ConfigurationServicePrx activate(const Ice::ObjectAdapterPtr& objectAdapter, const string& id);
 +
 +    void replaceConfig(const ICEConfigurationPtr& newConfig)
 +    {
 +        mICEConfig = newConfig;
 +    }
 +    
 +    void replaceConfig(const NATConfigPtr& newConfig)
 +    {
 +        mNATConfig = newConfig;
 +    }
 +    
 +    void replaceConfig(const SRTPConfigurationPtr& newConfig)
 +    {
 +        mSRTPConfig = newConfig;
 +    }
 +
 +    RtpGeneralGroupPtr getGeneralGroup()
 +    {
 +        return mGeneralGroup;
 +    }
 +
 +    RTPICEConfigurationGroupPtr getICEConfigurationGroup()
 +    {
 +        return mICEConfiguration;
 +    }
 +
 +    void replaceGroup(const RtpGeneralGroupPtr& group)
 +    {
 +        mGeneralGroup = group;
 +
 +        //
-         // There is most likely a lock already held on the configuration data
++        // replaceGroup methods are only called within the scope of an existing lock so we do not need one
++        // here.
 +        //
- 
 +        if (mGeneralGroup)
 +        {
 +            ConfigurationItemDict::const_iterator item = 
 +                mGeneralGroup->configurationItems.find(EnableSRTPItemName);
 +            if (item != mGeneralGroup->configurationItems.end())
 +            {
 +                EnableSRTPItemPtr srtpItem = EnableSRTPItemPtr::dynamicCast(item->second);
 +                if (!mSRTPConfig)
 +                {
 +                    mSRTPConfig = SRTPConfiguration::create(srtpItem->enabled);
 +                }
 +                else
 +                {
 +                    //
 +                    // No point in allocating if it's the same.
 +                    //Item
 +                    if (mSRTPConfig->isSRTPEnabled() != srtpItem->enabled)
 +                    {
 +                        mSRTPConfig = SRTPConfiguration::create(srtpItem->enabled);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    void replaceGroup(const RTPICEConfigurationGroupPtr& group)
 +    {
 +        mICEConfiguration = group;
 +    }
 +
 +private:
      /**
       * General RTP configuration
       */
diff --cc src/RTPSession.cpp
index bf5258d,05ce690..3fecc5c
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@@ -42,92 -39,45 +42,92 @@@ using namespace AsteriskSCF::Media::RTP
  using namespace AsteriskSCF::Replication::MediaRTPPJMedia::V1;
  using namespace AsteriskSCF::System::Component::V1;
  using namespace AsteriskSCF::Discovery;
 +using namespace AsteriskSCF::PJMediaRTP;
  
  /**
 - * Default value for where we should start allocating RTP and RTCP ports from.
 - */
 -#define DEFAULT_RTP_PORT_MINIMUM 10000
 -
 -/**
 - * Default value for where we should stop allocating RTP and RTCP ports.
 + * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
   */
 -#define DEFAULT_RTP_PORT_MAXIMUM 20000
 -
 -/**
 - * Private implementation details for the RTPSessionImpl class.
 - */
 -class RTPSessionImplPriv
 +class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::SRTPSession
  {
  public:
 -    RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
 -	const ReplicaPtr& replicaService,
 -	const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
 -	mAdapter(adapter), mFormats(formats),
 -        mSessionStateItem(new RtpSessionStateItem()),
 -        mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
 -    ~RTPSessionImplPriv();
 +    RTPSessionImpl(const Ice::ObjectAdapterPtr&, 
 +            const string& id,
 +            const PJMediaEnvironmentPtr& env,
 +            const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
 +            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl, 
 +	    const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
 +	    const ConfigurationServiceImplPtr&);
 +    
 +    RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
 +            const string& sessionIdentity,    
 +            const PJMediaEnvironmentPtr& env,
 +            Ice::Int port,
 +            const AsteriskSCF::Media::V1::FormatSeq& formats,
 +            bool isIPv6,
 +            bool srtp,
 +            const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStateReplicatorPrx>&,
 +            const ConfigurationServiceImplPtr& configurationServant);
 +
 +    ~RTPSessionImpl();
  
      /**
-      * AsteriskSCF::Media::V1::RSession implementation.
 -     * A pointer to the object adapter that objects should be added to.
++     * AsteriskSCF::Media::V1::Session implementation.
       */
 -    Ice::ObjectAdapterPtr mAdapter;
 +    AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
 +    AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
 +    std::string getId(const Ice::Current&);
 +    void useRTCP(bool, const Ice::Current&);
 +    AsteriskSCF::Media::RTP::V1::RTCPSessionPrx getRTCPSession(const Ice::Current&);
 +    void release(const Ice::Current&);
 +    void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
  
      /**
 -     * A sequence of formats that the session is expected to carry.
 +     * AsteriskSCF::Media::V1::SRTPSession implementation.
       */
 -    FormatSeq mFormats;
 +    void setOptions(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
 +    void start(const string& suiteName, const string& keyInfo, bool enableAuthentication, bool enableEncryption, const Ice::Current&);
 +
 +    /**
 +     * Internal methods. 
 +     */
 +    AsteriskSCF::Media::V1::FormatSeq getFormats();
 +    void setRemoteDetails(const std::string& address, Ice::Int port);
 +    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload);
 +    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
  
      /**
 -     * A proxy to ourselves.
 +     * Accessors for the source and sink servants. 
       */
 -    RTPSessionPrx mProxy;
 +    StreamSourceRTPImplPtr getSourceServant();
 +    StreamSinkRTPImplPtr getSinkServant();
 +
 +    void replicateState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
 +            const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
 +            const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
 +    void removeState(const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpSessionStateItemPtr&,
 +            const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSinkStateItemPtr&,
 +            const AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr&);
 +
 +    void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& payloadMap);
 +
 +    RTPSessionPrx activate(const string& id);
 +    RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId, const Ice::Identity& sinkId);
 +
 +    void destroy();
 +
 +private:
 +
 +    boost::shared_mutex mLock;
 +
 +    /**
 +     * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
 +     */
 +    SessionAdapterPtr mSessionAdapter;
 +
 +    /**
 +     * The current pjmedia related environment.
 +     */
 +    PJMediaEnvironmentPtr mEnvironment;
  
      /**
       * pjmedia endpoint for our media.
@@@ -610,147 -512,6 +610,147 @@@ void RTPSessionImpl::removeState(const 
      }
      catch (...)
      {
 -	mImpl->mStateReplicator->removeState(items);
 +	mStateReplicator->removeState(items);
 +    }
 +}
 +
 +void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
 +{
 +    for (PayloadMap::const_iterator it = mappings.begin(); it != mappings.end(); ++it)
 +    {
 +        mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
 +    }
 +}
 +
 +RTPSessionPrx RTPSessionImpl::activate(const string& id)
 +{
 +    assert(id == mId);
 +    Ice::Identity sourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
 +    Ice::Identity sinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
 +    return activate(mAdapter->getCommunicator()->stringToIdentity(id), sourceId, sinkId);
 +}
 +
 +RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id, const Ice::Identity& sourceId,
 +        const Ice::Identity& sinkId)
 +{
 +    mSessionAdapter.reset(new SessionAdapterImpl(this));
 +    try
 +    {
 +        mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mTransport, mId);
 +        mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mTransport, mId);
 +        mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
 +        mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
 +
 +        if (mSessionStateItem)
 +        {
 +            mSessionStateItem->key = mSessionStateItem->mSessionId = mId;
 +            mSessionStateItem->mSessionIdentity = id;
 +            mSessionStateItem->mFormats = mFormats;
 +            mSessionStateItem->mSourceIdentity = sourceId;
 +            mSessionStateItem->mSinkIdentity = sinkId;
 +            if (mTransport->localAddress())
 +            {
 +                mSessionStateItem->mPort = mTransport->localAddress()->port();
 +            }
 +            else
 +            {
 +                mSessionStateItem->mPort = 0;
 +            }
 +            replicateState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
 +        }
 +        RTPSessionPrx result = RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
 +        mTransport->addFacets(mAdapter, id);
 +        return result;
 +    }
 +    catch (...)
 +    {
 +        mSessionAdapter.reset();
 +        throw;
 +    }
 +    return RTPSessionPrx(); // For compilers that complain about such things.
 +}
 +
 +void RTPSessionImpl::destroy()
 +{
 +    /* Drop the source and sink from the ASM */
 +    mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
 +    mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
 +
 +    /* Since both the source and sink have a pointer back to the session we need to get rid of them,
 +     * which will in turn get rid of ourselves once we are removed from the ASM.
 +     */
 +    {
 +        boost::unique_lock<boost::shared_mutex> lock(mLock);
 +        mStreamSource = 0;
 +        mStreamSink = 0;
 +        mSessionAdapter.reset();
      }
 +
 +    /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
 +     * destruct and then cleanup will occur.
 +     */
 +    mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
  }
 +
 +class ReplicationAdapterImpl : public ReplicationAdapter
 +{
 +public:
 +
 +    void update(const RtpSessionStateItemPtr& item)
 +    {
 +        mImpl->associatePayloadsImpl(item->mPayloadstoFormats);
 +    }
 +
 +    void update(const RtpStreamSinkStateItemPtr& item)
 +    {
 +        mImpl->getSinkServant()->setSourceImpl(item->mSource);
 +        mImpl->getSinkServant()->setRemoteDetailsImpl(item->mRemoteAddress, item->mRemotePort);
 +        mImpl->getSourceServant()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort);
 +    }
 +
 +    void update(const RtpStreamSourceStateItemPtr& item)
 +    {
-         mImpl->getSourceServant()->setSinkImpl(item->mSink);
++        mImpl->getSourceServant()->setSinksImpl(item->mSinks);
 +    }
 +
 +    void destroy()
 +    {
 +        mImpl->destroy();
 +    }
 +
 +    ReplicationAdapterImpl(const RTPSessionImplPtr& impl) :
 +        mImpl(impl)
 +    {
 +    }
 +private:
 +
 +    RTPSessionImplPtr mImpl;
 +};
 +
 +RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
 +        const std::string& id, const RTPServiceLocatorParamsPtr& params,
 +        const PJMediaEnvironmentPtr& environment, 
 +        const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
 +        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
 +        const ConfigurationServiceImplPtr& configuration)
 +{
 +    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, id, environment, params,
 +                    replicaControl, stateReplicator, configuration));
 +    return servant->activate(id);
 +}
 +
 +ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
 +  const PJMediaEnvironmentPtr& environment, 
 +  const RtpSessionStateItemPtr& item,
 +  const ConfigurationServiceImplPtr& configuration)
 +{
 +    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, 
 +                    adapter->getCommunicator()->identityToString(item->mSessionIdentity), 
 +                    environment,
 +                    item->mPort, item->mFormats, item->mIPv6, item->mSRTP,
 +                    AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(), 
 +                    configuration));
 +    servant->activate(item->mSessionIdentity, item-> mSourceIdentity, item->mSinkIdentity);
 +    return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
 +}
 +
diff --cc src/RTPSink.cpp
index d1cd962,7c65568..2a54fb3
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@@ -119,7 -108,7 +119,7 @@@ void StreamSinkRTPImpl::write(const Ast
          int payload;
  
          // Only allow media formats through that we support
-         if ((payload = mImpl->mSessionAdapter->getPayload((*frame)->mediaformat)) < 0)
 -        if ((payload = mImpl->mSession->getPayload((*frame)->mediaFormat)) < 0)
++        if ((payload = mImpl->mSessionAdapter->getPayload((*frame)->mediaFormat)) < 0)
          {
              throw UnsupportedMediaFormatException();
          }
diff --cc src/RTPSource.cpp
index 9854098,c7c9c58..303ef7d
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@@ -14,11 -14,9 +14,12 @@@
   * at the top of the source tree.
   */
  
 +#include "RTPSource.h"
 +#include "RtpStateReplicationIf.h"
 +
  #include <pjlib.h>
  #include <pjmedia.h>
+ 
  #include <Ice/Ice.h>
  #include <IceUtil/UUID.h>
  
@@@ -73,7 -70,10 +76,11 @@@ public
       */
      RtpStreamSourceStateItemPtr mSourceStateItem;
  
 +    string mSessionId;
+     /**
+      * Lock that protects information contained.
+      */
+     boost::shared_mutex mLock;
  };
  
  /**
@@@ -101,13 -96,29 +108,29 @@@ StreamSourceRTPImpl::StreamSourceRTPImp
  }
  
  /**
-  * Implementation of the setSink method as defined in MediaIf.ice
+  * Implementation of the addSink method as defined in MediaIf.ice
+  */
+ void StreamSourceRTPImpl::addSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ {
+     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ 
+     mImpl->mSourceStateItem->mSinks.push_back(sink);
+ 
 -    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
++    mImpl->mSessionAdapter->replicateState(mImpl->mSourceStateItem);
+ }
+ 
+ /**
+  * Implementation of the removeSink method as defined in MediaIf.ice
   */
- void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
+ void StreamSourceRTPImpl::removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
  {
-     mImpl->mSourceStateItem->mSink = sink;
+     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
+ 
+     mImpl->mSourceStateItem->mSinks.erase(std::remove(mImpl->mSourceStateItem->mSinks.begin(),
+ 	    mImpl->mSourceStateItem->mSinks.end(),
+ 	    sink), mImpl->mSourceStateItem->mSinks.end());
  
 -    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
 +    mImpl->mSessionAdapter->replicateState(mImpl->mSourceStateItem);
  }
  
  /**
@@@ -200,40 -212,72 +223,72 @@@ static void receiveRTP(void *userdata, 
          return;
      }
  
-     if (source->mImpl->mSourceStateItem->mSink != 0)
+     // Update RTP stack information before writing to sinks, it's fine to do it
+     pjmedia_rtp_session_update(&source->mImpl->mIncomingSession, header, NULL);
+ 
+     if (source->mImpl->mSourceStateItem->mSinks.empty())
+     {
+ 	// No sinks present so frames can not go anywhere
+ 	return;
+     }
+ 
 -    FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
++    FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
+ 
+     if (!mediaformat)
      {
-         FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
+ 	// If this is for a payload we don't know about just drop the frame
+ 	return;
+     }
  
-         if (mediaformat != 0)
+     FrameSeq frames;
+ 
+     AudioFormatPtr audioformat;
+     VideoFormatPtr videoformat;
+ 
+     if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
+     {
+         AudioFramePtr frame = new AudioFrame();
+         frame->mediaFormat = mediaformat;
+ 
+         // Populate the common data
+         frame->timestamp = header->ts;
+         frame->seqno = header->seq;
+ 
+         // Copy the payload from the RTP packet into the frame
+         copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+ 
+         // Into the frames sequence it goes
+         frames.push_back(frame);
+     }
+     else if ((videoformat = VideoFormatPtr::dynamicCast(mediaformat)))
+     {
+         VideoFramePtr frame = new VideoFrame();
+         frame->mediaFormat = mediaformat;
+         frame->timestamp = header->ts;
+         frame->seqno = header->seq;
+         copy(payload, payload + payload_size, std::back_inserter(frame->payload));
+         frames.push_back(frame);
+     }
+ 
+     if (frames.empty())
+     {
+         // If the media format ended up being a type we don't understand don't bother writing it out
+         return;
+     }
+ 
+     boost::shared_lock<boost::shared_mutex> lock(source->mImpl->mLock);
+ 
+     for (StreamSinkSeq::iterator sink = source->mImpl->mSourceStateItem->mSinks.begin();
+          sink != source->mImpl->mSourceStateItem->mSinks.end();
+          ++sink)
+     {
+         try
+         {
+             (*sink)->write(frames);
+         }
+         catch (const Ice::Exception&)
          {
-             FrameSeq frames;
- 
-             AudioFormatPtr audioformat;
- 
-             if ((audioformat = AudioFormatPtr::dynamicCast(mediaformat)))
-             {
-                 AudioFramePtr frame = new AudioFrame();
-                 frame->mediaformat = mediaformat;
- 
-                 /* Populate the common data */
-                 frame->timestamp = header->ts;
-                 frame->seqno = header->seq;
- 
-                 /* Copy the payload from the RTP packet to the frame, yahoo! */
-                 copy(payload, payload + payload_size, std::back_inserter(frame->payload));
- 
-                 /* Into the sequence it goes, yarrrrrrrrrr matey */
-                 frames.push_back(frame);
-             }
- 
-             try
-             {
-                 source->mImpl->mSourceStateItem->mSink->write(frames);
-             }
-             catch (const Ice::Exception&)
-             {
-                 lg(Error) << "Exception caught while attempting to write media to an RTP sink";
-             }
+             lg(Error) << "Exception caught while attempting to write media to RTP sink " << (*sink);
          }
      }
  
@@@ -293,7 -335,12 +346,8 @@@ RtpStreamSourceStateItemPtr StreamSourc
      return mImpl->mSourceStateItem;
  }
  
- void StreamSourceRTPImpl::setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy)
 -/**
 - * API call which sets the sinks.
 - */
 -void StreamSourceRTPImpl::setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
++void StreamSourceRTPImpl::setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq& sinks)
  {
-     mImpl->mSourceStateItem->mSink = proxy;
+     boost::unique_lock<boost::shared_mutex> lock(mImpl->mLock);
 -
+     mImpl->mSourceStateItem->mSinks = sinks;
  }
diff --cc src/RTPSource.h
index 093eb6b,f032f88..f100cd4
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@@ -24,22 -21,20 +24,25 @@@ class StreamSourceRTPImplPriv
  class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
  {
  public:
 -    StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
 +    StreamSourceRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr& sessionAdapter, 
 +        const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr& transport,
 +        const std::string& parentSessionId);
 +
-     void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
-     AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
+     void addSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+     void removeSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
+     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
      AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
      std::string getId(const Ice::Current&);
      void requestFormat(const AsteriskSCF::Media::V1::FormatPtr&, const Ice::Current&);
      std::string getLocalAddress(const Ice::Current&);
      Ice::Int getLocalPort(const Ice::Current&);
+ 
      void setRemoteDetails(const std::string& address, Ice::Int port);
 -    void setSinks(const AsteriskSCF::Media::V1::StreamSinkSeq&);
++    void setSinksImpl(const AsteriskSCF::Media::V1::StreamSinkSeq&);
      AsteriskSCF::Replication::MediaRTPPJMedia::V1::RtpStreamSourceStateItemPtr getStateItem();
  
 +    void setSinkImpl(const AsteriskSCF::Media::V1::StreamSinkPrx& proxy);
 +
      /**
       * Private implementation data for StreamSourceRTPImpl.
       * Note: This is public on purpose so that our RTP callback can access it.

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list