[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Dec 8 09:47:56 CST 2010


branch "replication" has been updated
       via  fa399cca8fa1687c155e2513a27cffffe435192a (commit)
      from  1d3fefe487649815ea6ee8893a5069bf53f06193 (commit)

Summary of changes:
 src/MediaRTPpjmedia.cpp |    1 +
 src/RTPSession.cpp      |   67 +++++++++++++++++++++++++++++++++++++++++++---
 src/RTPSession.h        |    2 +
 src/RTPSink.cpp         |   11 ++++---
 src/RTPSink.h           |    2 +-
 src/RTPSource.cpp       |    9 +++---
 src/RTPSource.h         |    2 +-
 7 files changed, 78 insertions(+), 16 deletions(-)


- Log -----------------------------------------------------------------
commit fa399cca8fa1687c155e2513a27cffffe435192a
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 11:47:10 2010 -0400

    Associate source and sink state items with session state item, and call into replicateState and removeState at the proper times.

diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index bddfeda..7397bfc 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -28,6 +28,7 @@
 #include "System/Component/ComponentServiceIf.h"
 #include "IceLogger.h"
 #include "logger.h"
+#include "RtpStateReplicationIf.h"
 
 #include "RTPSession.h"
 
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index d2163ad..134ec5f 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -140,16 +140,17 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& f
     mImpl->mSessionStateItem->mFormats = formats;
 
     /* First up for our own stuff is... a source! Media needs to come from somewhere, you know. */
-    mImpl->mStreamSource = new StreamSourceRTPImpl(this);
+    mImpl->mStreamSource = new StreamSourceRTPImpl(this, mImpl->mSessionStateItem->key);
     mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSource));
     mImpl->mSessionStateItem->mSourceIdentity = mImpl->mStreamSourceProxy->ice_getIdentity();
 
     /* And for my next trick a place for us to send media out. */
-    mImpl->mStreamSink = new StreamSinkRTPImpl(this);
+    mImpl->mStreamSink = new StreamSinkRTPImpl(this, mImpl->mSessionStateItem->key);
     mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSink));
     mImpl->mSessionStateItem->mSinkIdentity = mImpl->mStreamSinkProxy->ice_getIdentity();
 
-    // TODO: Send out replication update for session, source, and sink
+    // Since everything has just come into creation send one big update with all state items
+    replicateState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
 }
 
 /**
@@ -172,10 +173,10 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory* f
 	// TODO: This is also bad, something is using the port */
     }
 
-    mImpl->mStreamSource = new StreamSourceRTPImpl(this);
+    mImpl->mStreamSource = new StreamSourceRTPImpl(this, 0);
     mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSource, sourceIdentity));
 
-    mImpl->mStreamSink = new StreamSinkRTPImpl(this);
+    mImpl->mStreamSink = new StreamSinkRTPImpl(this, 0);
     mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSink, sinkIdentity));
 }
 
@@ -245,6 +246,9 @@ RTCPSessionPrx RTPSessionImpl::getRTCPSession(const Ice::Current&)
  */
 void RTPSessionImpl::release(const Ice::Current&)
 {
+    // Remove everything from the state replicator if present
+    removeState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
+
     /* Drop the source and sink from the ASM */
     mImpl->mAdapter->remove(mImpl->mStreamSourceProxy->ice_getIdentity());
     mImpl->mAdapter->remove(mImpl->mStreamSinkProxy->ice_getIdentity());
@@ -272,6 +276,9 @@ void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::Payloa
     {
         mImpl->mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
     }
+
+    // Only the session has changed so push a single update out for it
+    replicateState(mImpl->mSessionStateItem, 0, 0);
 }
 
 /**
@@ -361,3 +368,53 @@ StreamSinkRTPPtr RTPSessionImpl::getSink()
 {
     return mImpl->mStreamSink;
 }
+
+/**
+ * API call which replicates state items.
+ */
+void RTPSessionImpl::replicateState(RtpSessionStateItemPtr session, RtpStreamSinkStateItemPtr sink, RtpStreamSourceStateItemPtr source)
+{
+    // If state replication has been disabled do nothing
+
+    RtpStateItemSeq items;
+
+    if (session)
+    {
+	items.push_back(session);
+    }
+
+    if (sink)
+    {
+	items.push_back(sink);
+    }
+
+    if (source)
+    {
+	items.push_back(source);
+    }
+}
+
+/**
+ * API call which removes state items from the replicator.
+ */
+void RTPSessionImpl::removeState(RtpSessionStateItemPtr session, RtpStreamSinkStateItemPtr sink, RtpStreamSourceStateItemPtr source)
+{
+    // If state replication has been disabled do nothing
+
+    Ice::StringSeq items;
+
+    if (session)
+    {
+	items.push_back(session->key);
+    }
+
+    if (sink)
+    {
+	items.push_back(sink->key);
+    }
+
+    if (source)
+    {
+	items.push_back(source->key);
+    }
+}
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 2b72c0b..005a054 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -58,6 +58,8 @@ public:
     int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
     StreamSourceRTPImplPtr getSource();
     AsteriskSCF::Media::RTP::V1::StreamSinkRTPPtr getSink();
+    void replicateState(AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr);
+    void removeState(AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr);
 private:
     /**
      * Private implementation data for RTPSessionImpl.
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index c30e611..c07c251 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -41,7 +41,7 @@ public:
     /**
      * Constructor for our StreamSinkRTPImplPriv class.
      */
-    StreamSinkRTPImplPriv(RTPSessionImplPtr);
+    StreamSinkRTPImplPriv(RTPSessionImplPtr, std::string);
 
     /**
      * A structure containing outgoing pjmedia session data.
@@ -62,9 +62,10 @@ public:
 /**
  * Constructor for the StreamSinkRTPImplPriv class.
  */
-StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session) : mSession(session), mSinkStateItem(new RtpStreamSinkStateItem())
+StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session, std::string sessionId) : mSession(session), mSinkStateItem(new RtpStreamSinkStateItem())
 {
     pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
+    mSinkStateItem->mSessionId = sessionId;
     mSinkStateItem->key = IceUtil::generateUUID();
     mSinkStateItem->mRemotePort = 0;
 };
@@ -72,7 +73,7 @@ StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session) : mSessi
 /**
  * Constructor for the StreamSinkRTPImpl class.
  */
-StreamSinkRTPImpl::StreamSinkRTPImpl(RTPSessionImplPtr session) : mImpl(new StreamSinkRTPImplPriv(session))
+StreamSinkRTPImpl::StreamSinkRTPImpl(RTPSessionImplPtr session, std::string sessionId) : mImpl(new StreamSinkRTPImplPriv(session, sessionId))
 {
 }
 
@@ -144,7 +145,7 @@ void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&
 {
     mImpl->mSinkStateItem->mSource = source;
 
-    // TODO: Send state item off for replication
+    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
 }
 
 /**
@@ -188,7 +189,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const std::string& address, Ice::Int po
     mImpl->mSinkStateItem->mRemoteAddress = address;
     mImpl->mSinkStateItem->mRemotePort = port;
 
-    // TODO: Send state item off for replication
+    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
 }
 
 /**
diff --git a/src/RTPSink.h b/src/RTPSink.h
index cf5c114..a350d18 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -21,7 +21,7 @@ class StreamSinkRTPImplPriv;
 class StreamSinkRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSinkRTP
 {
 public:
-    StreamSinkRTPImpl(RTPSessionImplPtr);
+    StreamSinkRTPImpl(RTPSessionImplPtr, std::string);
     void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
     void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 8929c0d..1b8e14e 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -48,7 +48,7 @@ public:
     /**
      * Constructor for our StreamSourceRTPImplPriv class.
      */
-    StreamSourceRTPImplPriv(RTPSessionImplPtr);
+    StreamSourceRTPImplPriv(RTPSessionImplPtr, std::string);
 
     /**
      * A structure containing incoming pjmedia session data.
@@ -69,16 +69,17 @@ public:
 /**
  * Constructor for the StreamSourceRTPImplPriv class.
  */
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(RTPSessionImplPtr session) : mSession(session), mSourceStateItem(new RtpStreamSourceStateItem())
+StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(RTPSessionImplPtr session, std::string sessionId) : mSession(session), mSourceStateItem(new RtpStreamSourceStateItem())
 {
     pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
+    mSourceStateItem->mSessionId = sessionId;
     mSourceStateItem->key = IceUtil::generateUUID();
 };
 
 /**
  * Constructor for the StreamSourceRTPImpl class.
  */
-StreamSourceRTPImpl::StreamSourceRTPImpl(RTPSessionImplPtr session) : mImpl(new StreamSourceRTPImplPriv(session))
+StreamSourceRTPImpl::StreamSourceRTPImpl(RTPSessionImplPtr session, std::string sessionId) : mImpl(new StreamSourceRTPImplPriv(session, sessionId))
 {
 }
 
@@ -89,7 +90,7 @@ void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& s
 {
     mImpl->mSourceStateItem->mSink = sink;
 
-    // TODO: Send off for replication
+    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
 }
 
 /**
diff --git a/src/RTPSource.h b/src/RTPSource.h
index 16a6f7c..c6d9d4a 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -21,7 +21,7 @@ class StreamSourceRTPImplPriv;
 class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
 {
 public:
-    StreamSourceRTPImpl(RTPSessionImplPtr);
+    StreamSourceRTPImpl(RTPSessionImplPtr, std::string);
     void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list