[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed Dec 8 09:47:56 CST 2010
branch "replication" has been updated
via fa399cca8fa1687c155e2513a27cffffe435192a (commit)
from 1d3fefe487649815ea6ee8893a5069bf53f06193 (commit)
Summary of changes:
src/MediaRTPpjmedia.cpp | 1 +
src/RTPSession.cpp | 67 +++++++++++++++++++++++++++++++++++++++++++---
src/RTPSession.h | 2 +
src/RTPSink.cpp | 11 ++++---
src/RTPSink.h | 2 +-
src/RTPSource.cpp | 9 +++---
src/RTPSource.h | 2 +-
7 files changed, 78 insertions(+), 16 deletions(-)
- Log -----------------------------------------------------------------
commit fa399cca8fa1687c155e2513a27cffffe435192a
Author: Joshua Colp <jcolp at digium.com>
Date: Wed Dec 8 11:47:10 2010 -0400
Associate source and sink state items with session state item, and call into replicateState and removeState at the proper times.
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index bddfeda..7397bfc 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -28,6 +28,7 @@
#include "System/Component/ComponentServiceIf.h"
#include "IceLogger.h"
#include "logger.h"
+#include "RtpStateReplicationIf.h"
#include "RTPSession.h"
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index d2163ad..134ec5f 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -140,16 +140,17 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& f
mImpl->mSessionStateItem->mFormats = formats;
/* First up for our own stuff is... a source! Media needs to come from somewhere, you know. */
- mImpl->mStreamSource = new StreamSourceRTPImpl(this);
+ mImpl->mStreamSource = new StreamSourceRTPImpl(this, mImpl->mSessionStateItem->key);
mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSource));
mImpl->mSessionStateItem->mSourceIdentity = mImpl->mStreamSourceProxy->ice_getIdentity();
/* And for my next trick a place for us to send media out. */
- mImpl->mStreamSink = new StreamSinkRTPImpl(this);
+ mImpl->mStreamSink = new StreamSinkRTPImpl(this, mImpl->mSessionStateItem->key);
mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSink));
mImpl->mSessionStateItem->mSinkIdentity = mImpl->mStreamSinkProxy->ice_getIdentity();
- // TODO: Send out replication update for session, source, and sink
+ // Since everything has just come into creation send one big update with all state items
+ replicateState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
}
/**
@@ -172,10 +173,10 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory* f
// TODO: This is also bad, something is using the port */
}
- mImpl->mStreamSource = new StreamSourceRTPImpl(this);
+ mImpl->mStreamSource = new StreamSourceRTPImpl(this, 0);
mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSource, sourceIdentity));
- mImpl->mStreamSink = new StreamSinkRTPImpl(this);
+ mImpl->mStreamSink = new StreamSinkRTPImpl(this, 0);
mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSink, sinkIdentity));
}
@@ -245,6 +246,9 @@ RTCPSessionPrx RTPSessionImpl::getRTCPSession(const Ice::Current&)
*/
void RTPSessionImpl::release(const Ice::Current&)
{
+ // Remove everything from the state replicator if present
+ removeState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
+
/* Drop the source and sink from the ASM */
mImpl->mAdapter->remove(mImpl->mStreamSourceProxy->ice_getIdentity());
mImpl->mAdapter->remove(mImpl->mStreamSinkProxy->ice_getIdentity());
@@ -272,6 +276,9 @@ void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::Payloa
{
mImpl->mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
}
+
+ // Only the session has changed so push a single update out for it
+ replicateState(mImpl->mSessionStateItem, 0, 0);
}
/**
@@ -361,3 +368,53 @@ StreamSinkRTPPtr RTPSessionImpl::getSink()
{
return mImpl->mStreamSink;
}
+
+/**
+ * API call which replicates state items.
+ */
+void RTPSessionImpl::replicateState(RtpSessionStateItemPtr session, RtpStreamSinkStateItemPtr sink, RtpStreamSourceStateItemPtr source)
+{
+ // If state replication has been disabled do nothing
+
+ RtpStateItemSeq items;
+
+ if (session)
+ {
+ items.push_back(session);
+ }
+
+ if (sink)
+ {
+ items.push_back(sink);
+ }
+
+ if (source)
+ {
+ items.push_back(source);
+ }
+}
+
+/**
+ * API call which removes state items from the replicator.
+ */
+void RTPSessionImpl::removeState(RtpSessionStateItemPtr session, RtpStreamSinkStateItemPtr sink, RtpStreamSourceStateItemPtr source)
+{
+ // If state replication has been disabled do nothing
+
+ Ice::StringSeq items;
+
+ if (session)
+ {
+ items.push_back(session->key);
+ }
+
+ if (sink)
+ {
+ items.push_back(sink->key);
+ }
+
+ if (source)
+ {
+ items.push_back(source->key);
+ }
+}
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 2b72c0b..005a054 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -58,6 +58,8 @@ public:
int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
StreamSourceRTPImplPtr getSource();
AsteriskSCF::Media::RTP::V1::StreamSinkRTPPtr getSink();
+ void replicateState(AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr);
+ void removeState(AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr, AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr);
private:
/**
* Private implementation data for RTPSessionImpl.
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index c30e611..c07c251 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -41,7 +41,7 @@ public:
/**
* Constructor for our StreamSinkRTPImplPriv class.
*/
- StreamSinkRTPImplPriv(RTPSessionImplPtr);
+ StreamSinkRTPImplPriv(RTPSessionImplPtr, std::string);
/**
* A structure containing outgoing pjmedia session data.
@@ -62,9 +62,10 @@ public:
/**
* Constructor for the StreamSinkRTPImplPriv class.
*/
-StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session) : mSession(session), mSinkStateItem(new RtpStreamSinkStateItem())
+StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session, std::string sessionId) : mSession(session), mSinkStateItem(new RtpStreamSinkStateItem())
{
pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
+ mSinkStateItem->mSessionId = sessionId;
mSinkStateItem->key = IceUtil::generateUUID();
mSinkStateItem->mRemotePort = 0;
};
@@ -72,7 +73,7 @@ StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(RTPSessionImplPtr session) : mSessi
/**
* Constructor for the StreamSinkRTPImpl class.
*/
-StreamSinkRTPImpl::StreamSinkRTPImpl(RTPSessionImplPtr session) : mImpl(new StreamSinkRTPImplPriv(session))
+StreamSinkRTPImpl::StreamSinkRTPImpl(RTPSessionImplPtr session, std::string sessionId) : mImpl(new StreamSinkRTPImplPriv(session, sessionId))
{
}
@@ -144,7 +145,7 @@ void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&
{
mImpl->mSinkStateItem->mSource = source;
- // TODO: Send state item off for replication
+ mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
}
/**
@@ -188,7 +189,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const std::string& address, Ice::Int po
mImpl->mSinkStateItem->mRemoteAddress = address;
mImpl->mSinkStateItem->mRemotePort = port;
- // TODO: Send state item off for replication
+ mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
}
/**
diff --git a/src/RTPSink.h b/src/RTPSink.h
index cf5c114..a350d18 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -21,7 +21,7 @@ class StreamSinkRTPImplPriv;
class StreamSinkRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSinkRTP
{
public:
- StreamSinkRTPImpl(RTPSessionImplPtr);
+ StreamSinkRTPImpl(RTPSessionImplPtr, std::string);
void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 8929c0d..1b8e14e 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -48,7 +48,7 @@ public:
/**
* Constructor for our StreamSourceRTPImplPriv class.
*/
- StreamSourceRTPImplPriv(RTPSessionImplPtr);
+ StreamSourceRTPImplPriv(RTPSessionImplPtr, std::string);
/**
* A structure containing incoming pjmedia session data.
@@ -69,16 +69,17 @@ public:
/**
* Constructor for the StreamSourceRTPImplPriv class.
*/
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(RTPSessionImplPtr session) : mSession(session), mSourceStateItem(new RtpStreamSourceStateItem())
+StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(RTPSessionImplPtr session, std::string sessionId) : mSession(session), mSourceStateItem(new RtpStreamSourceStateItem())
{
pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
+ mSourceStateItem->mSessionId = sessionId;
mSourceStateItem->key = IceUtil::generateUUID();
};
/**
* Constructor for the StreamSourceRTPImpl class.
*/
-StreamSourceRTPImpl::StreamSourceRTPImpl(RTPSessionImplPtr session) : mImpl(new StreamSourceRTPImplPriv(session))
+StreamSourceRTPImpl::StreamSourceRTPImpl(RTPSessionImplPtr session, std::string sessionId) : mImpl(new StreamSourceRTPImplPriv(session, sessionId))
{
}
@@ -89,7 +90,7 @@ void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& s
{
mImpl->mSourceStateItem->mSink = sink;
- // TODO: Send off for replication
+ mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
}
/**
diff --git a/src/RTPSource.h b/src/RTPSource.h
index 16a6f7c..c6d9d4a 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -21,7 +21,7 @@ class StreamSourceRTPImplPriv;
class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
{
public:
- StreamSourceRTPImpl(RTPSessionImplPtr);
+ StreamSourceRTPImpl(RTPSessionImplPtr, std::string);
void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list