[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Dec 8 08:12:54 CST 2010


branch "replication" has been updated
       via  7eacee9b505a41ed7e853abb2649e36edb5f75b8 (commit)
      from  dfff07e4c5f76afe413a808f40a7a75b05849a63 (commit)

Summary of changes:
 local-slice/RtpStateReplicationIf.ice |    2 +-
 src/RTPSession.cpp                    |   45 ++++++++++++++++++++++++-
 src/RTPSession.h                      |   13 +++++++
 src/RTPSource.h                       |    5 ---
 src/RtpStateReplicatorListener.cpp    |   60 +++++++++++++++++++++++++++++++++
 5 files changed, 118 insertions(+), 7 deletions(-)


- Log -----------------------------------------------------------------
commit 7eacee9b505a41ed7e853abb2649e36edb5f75b8
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 10:12:14 2010 -0400

    Add an actual implementation for the listener.

diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index d88bbfe..1e405e6 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -69,7 +69,7 @@ module V1
    class RtpSessionStateItem extends RtpStateItem
    {
        Ice::Identity mSessionIdentity;
-       int port;
+       int mPort;
        Ice::Identity mSinkIdentity;
        Ice::Identity mSourceIdentity;
        AsteriskSCF::Media::V1::FormatSeq mFormats;
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index b61a4a2..657a3a9 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -107,7 +107,7 @@ public:
 };
 
 /**
- * Constructor for the RTPSessionImpl class.
+ * Constructor for the RTPSessionImpl class (used by Ice).
  */
 RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& formats, pj_pool_factory* factory) : mImpl(new RTPSessionImplPriv(adapter, formats))
 {
@@ -141,6 +141,33 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& f
 }
 
 /**
+ * Constructor for the RTPSessionImpl class (used by state replicator).
+ */
+RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory* factory, Ice::Identity sessionIdentity, Ice::Identity sinkIdentity,
+    Ice::Identity sourceIdentity, Ice::Int port, const FormatSeq& formats) : mImpl(new RTPSessionImplPriv(adapter, formats))
+{
+    mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
+
+    pj_status_t status = pjmedia_endpt_create(factory, NULL, 1, &mImpl->mEndpoint);
+
+    if (status != PJ_SUCCESS)
+    {
+	/* TODO: This is bad... we can't go on! */
+    }
+
+    if ((status = pjmedia_transport_udp_create2(mImpl->mEndpoint, "RTP", NULL, port, 0, &mImpl->mTransport)) != PJ_SUCCESS)
+    {
+	// TODO: This is also bad, something is using the port */
+    }
+
+    mImpl->mStreamSource = new StreamSourceRTPImpl(this);
+    mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSource, sourceIdentity));
+
+    mImpl->mStreamSink = new StreamSinkRTPImpl(this);
+    mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSink, sinkIdentity));
+}
+
+/**
  * Destructor for the RTPSessionImplPriv class.
  */
 RTPSessionImplPriv::~RTPSessionImplPriv()
@@ -306,3 +333,19 @@ int RTPSessionImpl::getPayload(const FormatPtr& mediaformat)
 
     return (*it).second;
 }
+
+/**
+ * API call which returns a pointer to the source.
+ */
+StreamSourceRTPImplPtr RTPSessionImpl::getSource()
+{
+    return mImpl->mStreamSource;
+}
+
+/**
+ * API call which returns a pointer to the sink.
+ */
+StreamSinkRTPPtr RTPSessionImpl::getSink()
+{
+    return mImpl->mStreamSink;
+}
diff --git a/src/RTPSession.h b/src/RTPSession.h
index ff9585d..5a72f8b 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -16,12 +16,23 @@
 class RTPSessionImplPriv;
 
 /**
+ * Forward definition for our private implementation of StreamSourceRTP.
+ */
+class StreamSourceRTPImpl;
+
+/**
+ * A typedef which creates a smart pointer type for StreamSourceRTPImpl.
+ */
+typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
+
+/**
  * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
  */
 class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
 {
 public:
     RTPSessionImpl(Ice::ObjectAdapterPtr, const AsteriskSCF::Media::V1::FormatSeq&, pj_pool_factory*);
+    RTPSessionImpl(Ice::ObjectAdapterPtr, pj_pool_factory*, Ice::Identity, Ice::Identity, Ice::Identity, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&);
     AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
@@ -35,6 +46,8 @@ public:
     void setRemoteDetails(std::string address, int port);
     AsteriskSCF::Media::V1::FormatPtr getFormat(int payload);
     int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
+    StreamSourceRTPImplPtr getSource();
+    AsteriskSCF::Media::RTP::V1::StreamSinkRTPPtr getSink();
 private:
     /**
      * Private implementation data for RTPSessionImpl.
diff --git a/src/RTPSource.h b/src/RTPSource.h
index bd92dea..5d0d5e9 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -37,8 +37,3 @@ public:
      */
     boost::shared_ptr<StreamSourceRTPImplPriv> mImpl;
 };
-
-/**
- * A typedef which creates a smart pointer type for StreamSourceRTPImpl.
- */
-typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index d467bd3..0686367 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -19,7 +19,13 @@
 #include <boost/thread.hpp>
 #include <boost/shared_ptr.hpp>
 
+#include <pjlib.h>
+#include <pjmedia.h>
+
 #include "RtpStateReplicator.h"
+#include "RTPSession.h"
+#include "RTPSink.h"
+#include "RTPSource.h"
 
 using namespace AsteriskSCF::Media::RTP::V1;
 
@@ -29,9 +35,18 @@ public:
     RtpStateReplicatorItem() { }
     ~RtpStateReplicatorItem()
     {
+	mSession->release(Ice::Current());
     }
 
+    // Helper function which sets the session on this replicator item
+    void setSession(RTPSessionImplPtr session) { mSession = session; };
+
+    // Helper function which gets the session
+    RTPSessionImplPtr getSession() { return mSession; };
+
 private:
+    // Pointer to the session that we are managing
+    RTPSessionImplPtr mSession;
 };
 
 struct RtpStateReplicatorListenerImpl
@@ -52,10 +67,55 @@ public:
         for (RtpStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
         {
             std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mStateItems.find((*item)->mSessionId);
+	    RtpSessionStateItemPtr session;
+	    RtpStreamSinkStateItemPtr sink;
+	    RtpStreamSourceStateItemPtr source;
+	    boost::shared_ptr<RtpStateReplicatorItem> localitem;
+
+	    if ((session = RtpSessionStateItemPtr::dynamicCast((*item))))
+	    {
+		if (i == mStateItems.end())
+		{
+		    boost::shared_ptr<RtpStateReplicatorItem> newitem(new RtpStateReplicatorItem());
+		    mStateItems.insert(std::make_pair<std::string, boost::shared_ptr<RtpStateReplicatorItem> >((*item)->mSessionId, newitem));
+		    localitem = newitem;
+
+		    RTPSessionImplPtr localSession = new RTPSessionImpl(mAdapter, mPoolFactory, session->mSessionIdentity, session->mSinkIdentity,
+			session->mSourceIdentity, session->mPort, session->mFormats);
+		    localitem->setSession(localSession);
+		}
+		else
+		{
+		    localitem = i->second;
+		}
+
+	        localitem->getSession()->associatePayloads(session->mPayloadstoFormats, Ice::Current());
+	    }
+	    else if ((sink = RtpStreamSinkStateItemPtr::dynamicCast((*item))))
+	    {
+		if (i == mStateItems.end())
+		{
+		    continue;
+		}
+
+		i->second->getSession()->getSink()->setSource(sink->mSource, Ice::Current());
+		i->second->getSession()->getSink()->setRemoteDetails(sink->mRemoteAddress, sink->mRemotePort, Ice::Current());
+	    }
+	    else if ((source = RtpStreamSourceStateItemPtr::dynamicCast((*item))))
+	    {
+		if (i == mStateItems.end())
+		{
+		    continue;
+		}
+
+		i->second->getSession()->getSource()->setSink(source->mSink, Ice::Current());
+	    }
         }
     }
     std::string mId;
     std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> > mStateItems;
+    Ice::ObjectAdapterPtr mAdapter;
+    pj_pool_factory *mPoolFactory;
 };
 
 RtpStateReplicatorListenerI::RtpStateReplicatorListenerI()

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list