[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "modular-transport-refactor" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Jun 9 15:17:37 CDT 2011


branch "modular-transport-refactor" has been updated
       via  0662214b1fd282498f774bf1acd8e27124e82c1f (commit)
      from  54cd52fa82cafdb5fd23aa0ead3ab70f5f4296d3 (commit)

Summary of changes:
 src/CMakeLists.txt                 |    2 +
 src/MediaRTPpjmedia.cpp            |   27 ++-
 src/RTPConfiguration.cpp           |    9 +-
 src/RTPConfiguration.h             |    2 +-
 src/RTPSession.cpp                 |  477 ++++++++++++++++++++++++++----------
 src/RTPSession.h                   |   98 +++-----
 src/RTPSink.cpp                    |   48 ++--
 src/RTPSink.h                      |    6 +-
 src/RTPSource.cpp                  |   39 ++--
 src/RTPSource.h                    |    6 +-
 src/ReplicationAdapter.h           |   43 ++++
 src/RtpStateReplicator.h           |   10 +-
 src/RtpStateReplicatorListener.cpp |   54 +++--
 src/SessionAdapter.h               |   64 +++++
 14 files changed, 601 insertions(+), 284 deletions(-)
 create mode 100644 src/ReplicationAdapter.h
 create mode 100644 src/SessionAdapter.h


- Log -----------------------------------------------------------------
commit 0662214b1fd282498f774bf1acd8e27124e82c1f
Author: Brent Eagles <beagles at digium.com>
Date:   Thu Jun 9 17:28:10 2011 -0230

    This step strives to break apart the physical dependencies between the session
    implementation and the sources and sinks. A similar decoupling occurs in the
    state replication listener.
    
    This is the first step in preparing the major parts to work with a generic
    transport interface instead of including UDP specific details directly in the
    servants. In the end RTPSink and RTPSource will be able to support the
    requirements as layed out in Slice for any of the supported media transport
    types without any special knowledge of those transports.
    
    While it appears a set of random and academic changes, I found integrating the
    ICE transport implementation made things pretty messy or resulted in a lot of
    code duplication. I would prefer to keep the UDP, ICE and SRTP code as
    "isolated" from each other as possible but keep duplication to a minimum.
    Reorganizing the code a bit helps do that.
    
    Changes of note:
    
     - The RTPSessionImpl is now hidden in the RTPSession.cpp file, removing the
       need for an RTPSessionImplPriv data class and enforcing the decoupling between
       the various servants.
    
     - Adapter interfaces are defined, implemented and used as necessary to
       "formalize" the required interchange and make it easier to control the
       "inter-play". These are the SessionAdapter and ReplicatorAdapter interfaces.
    
     - I've tried to remember to add XXX comments where I've added lines that will
       be removed in the upcoming changes.

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0669536..d0fb862 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -21,6 +21,8 @@ asterisk_scf_component_add_file(media_rtp_pjmedia PJLibConfiguration.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEnvironment.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEnvironment.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJUtil.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia ReplicationAdapter.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia SessionAdapter.h)
 
 asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpStateReplicationIf.ice)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpConfigurationIf.ice)
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index a74c0f8..6dd0f2c 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -67,7 +67,7 @@ static const string MediaComparatorServiceId("RTPMediaServiceComparator");
 class RTPMediaServiceImpl : public RTPMediaService
 {
 public:
-    RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
+    RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPrx&,
 	const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&,
 	const ConfigurationServiceImplPtr&);
     RTPSessionPrx allocate(const RTPServiceLocatorParamsPtr&, const Ice::Current&);
@@ -89,9 +89,9 @@ private:
     PJMediaEnvironmentPtr mEnvironment;
 
     /**
-     * A pointer to the replica service.
+     * A proxy for the replica service
      */
-    ReplicaPtr mReplicaService;
+    ReplicaPrx mReplicaServicePrx;
 
     /**
      * A pointer to the configuration service.
@@ -247,6 +247,11 @@ private:
     ReplicaPtr mReplicaService;
 
     /**
+     * A proxy to the replica control object.
+     */
+    ReplicaPrx mReplicaServicePrx;
+
+    /**
      * Instance of our configuration service implementation.
      */
     ConfigurationServiceImplPtr mConfigurationService;
@@ -421,11 +426,11 @@ typedef IceUtil::Handle<RtpConfigurationCompare> RtpConfigurationComparePtr;
 /**
  * Constructor for the RTPMediaServiceImpl class.
  */
-RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
+RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPrx& replicaService,
     const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
     const ConfigurationServiceImplPtr& configurationService) :
     mAdapter(adapter), mEnvironment(PJMediaEnvironment::create(adapter->getCommunicator()->getProperties(), "")),
-    mReplicaService(replicaService), mConfigurationService(configurationService),
+    mReplicaServicePrx(replicaService), mConfigurationService(configurationService),
     mStateReplicator(stateReplicator)
 {
 }
@@ -435,10 +440,8 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, c
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(const RTPServiceLocatorParamsPtr& params, const Ice::Current&)
 {
-    RTPSessionImplPtr session =
-        new RTPSessionImpl(mAdapter, params, mEnvironment->poolFactory(), mReplicaService, mStateReplicator,
-	    mConfigurationService);
-    return session->getProxy();
+    return AsteriskSCF::PJMediaRTP::RTPSession::create(mAdapter, IceUtil::generateUUID(), params, mEnvironment,
+            mReplicaServicePrx, mStateReplicator, mConfigurationService);
 }
 
 /**
@@ -479,7 +482,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     mLocalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapterLocal");
 
     mReplicaService = new ReplicaImpl(mLocalAdapter);
-    mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
+    mReplicaServicePrx = ReplicaPrx::uncheckedCast(mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId)));
 
     mConfigurationService = new ConfigurationServiceImpl();
     ConfigurationServicePrx mConfigurationServiceProxy = ConfigurationServicePrx::uncheckedCast(
@@ -515,7 +518,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     }
 
     RTPMediaServiceImplPtr rtpmediaservice =
-        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator, mConfigurationService);
+        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaServicePrx, mStateReplicator, mConfigurationService);
 
     if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.Standalone", "false") == "true")
     {
@@ -547,7 +550,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     if (mStateReplicator)
     {
         mReplicatorListener =
-            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState,
+            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getEnvironment(), mGeneralState,
 		mConfigurationService);
         mReplicatorListenerProxy =
             RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index b0a788b..aa23f95 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -14,16 +14,15 @@
  * at the top of the source tree.
  */
 
+#include "RtpConfigurationIf.h"
+#include "RTPConfiguration.h"
+
 #include <IceUtil/UUID.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
 
 #include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
-#include <AsteriskSCF/System/Component/ConfigurationIf.h>
-
-#include "RtpConfigurationIf.h"
-#include "RTPConfiguration.h"
 
 using namespace AsteriskSCF::System::Configuration::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
diff --git a/src/RTPConfiguration.h b/src/RTPConfiguration.h
index af1a978..b6fc401 100644
--- a/src/RTPConfiguration.h
+++ b/src/RTPConfiguration.h
@@ -17,8 +17,8 @@
 #pragma once
 
 #include <Ice/Ice.h>
-
 #include <AsteriskSCF/System/Component/ConfigurationIf.h>
+#include <boost/shared_ptr.hpp>
 
 /*
  * Private implementation class for ConfigurationServiceImpl.
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 641e5c8..64aa92a 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -14,6 +14,12 @@
  * at the top of the source tree.
  */
 
+#include "RTPSession.h"
+#include "RtpStateReplicationIf.h"
+#include "RTPSource.h"
+#include "RTPSink.h"
+#include "RTPConfiguration.h"
+
 #include <pjlib.h>
 #include <pjmedia.h>
 
@@ -24,44 +30,94 @@
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-#include "RtpStateReplicationIf.h"
-
-#include "RTPSession.h"
-#include "RTPSource.h"
-#include "RTPSink.h"
-
-#include "RTPConfiguration.h"
-
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::System::Component::V1;
 using namespace AsteriskSCF::Discovery;
+using namespace AsteriskSCF::PJMediaRTP;
 
 /**
- * Default value for where we should start allocating RTP and RTCP ports from.
- */
-#define DEFAULT_RTP_PORT_MINIMUM 10000
-
-/**
- * Default value for where we should stop allocating RTP and RTCP ports.
+ * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
  */
-#define DEFAULT_RTP_PORT_MAXIMUM 20000
-
-/**
- * Private implementation details for the RTPSessionImpl class.
- */
-class RTPSessionImplPriv
+class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
 {
 public:
-    RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
-	const ReplicaPtr& replicaService,
-	const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
-	mAdapter(adapter), mFormats(formats),
-        mSessionStateItem(new RtpSessionStateItem()),
-        mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
-    ~RTPSessionImplPriv();
+    RTPSessionImpl(const Ice::ObjectAdapterPtr&, 
+            const string& id,
+            const PJMediaEnvironmentPtr& env,
+            const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl, 
+	    const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
+	    const ConfigurationServiceImplPtr&);
+    
+    RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
+            const string& sessionIdentity,    
+            const PJMediaEnvironmentPtr& env,
+            Ice::Int port,
+            const AsteriskSCF::Media::V1::FormatSeq& formats,
+            bool isIPv6,
+            const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
+            const ConfigurationServiceImplPtr& configurationServant);
+
+    ~RTPSessionImpl();
+
+    /**
+     * AsteriskSCF::Media::V1::Session implementation.
+     */
+    AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
+    AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
+    std::string getId(const Ice::Current&);
+    void useRTCP(bool, const Ice::Current&);
+    AsteriskSCF::Media::RTP::V1::RTCPSessionPrx getRTCPSession(const Ice::Current&);
+    void release(const Ice::Current&);
+    void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
+
+    /**
+     * Internal methods. 
+     */
+    pjmedia_transport* getTransport();
+    AsteriskSCF::Media::V1::FormatSeq getFormats();
+    void setRemoteDetails(const std::string& address, Ice::Int port);
+    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload);
+    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
+
+    /**
+     * Accessors for the source and sink servants. The source servant is returned through a pointer to the
+     * implementation type because some implementation specific details need to be exposed.
+     */
+    StreamSourceRTPImplPtr getSourceServant();
+    AsteriskSCF::Media::RTP::V1::StreamSinkRTPPtr getSinkServant();
+
+    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
+    void removeState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
+
+    void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& payloadMap);
+
+    RTPSessionPrx activate(const string& id);
+    RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId, const Ice::Identity& sinkId);
+    
+private:
+
+    /**
+     * Instance of the session adapter to be passed to sinks, sources, configuration, etc.
+     */
+    boost::shared_ptr<SessionAdapter> mSessionAdapter;
+
+    /**
+     * The current pjmedia related environment.
+     */
+    PJMediaEnvironmentPtr mEnvironment;
+
+    /**
+     * Id.
+     */
+    string mId;
 
     /**
      * A pointer to the object adapter that objects should be added to.
@@ -74,11 +130,6 @@ public:
     FormatSeq mFormats;
 
     /**
-     * A proxy to ourselves.
-     */
-    RTPSessionPrx mProxy;
-
-    /**
      * pjmedia endpoint for our media.
      */
     pjmedia_endpt* mEndpoint;
@@ -119,9 +170,9 @@ public:
     RtpSessionStateItemPtr mSessionStateItem;
 
     /**
-     * A pointer to the replica instance.
+     * A proxy to our related replica control object.
      */
-    ReplicaPtr mReplicaService;
+    ReplicaPrx mReplicaService;
 
     /**
      * A proxy to the state replicator where we are sending updates to.
@@ -130,19 +181,95 @@ public:
 };
 
 /**
- * Constructor for the RTPSessionImpl class (used by Ice).
+ * A typedef which creates a smart pointer type for RTPSessionImpl.
+ */
+typedef IceUtil::Handle<RTPSessionImpl> RTPSessionImplPtr;
+
+
+/**
+ * Default value for where we should start allocating RTP and RTCP ports from.
+ */
+#define DEFAULT_RTP_PORT_MINIMUM 10000
+
+/**
+ * Default value for where we should stop allocating RTP and RTCP ports.
  */
-RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPServiceLocatorParamsPtr& params,
-    pj_pool_factory* factory, const ReplicaPtr& replicaService,
-    const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
-    const ConfigurationServiceImplPtr& configurationService) : 
-    mImpl(new RTPSessionImplPriv(adapter, params->formats, replicaService, stateReplicator))
+#define DEFAULT_RTP_PORT_MAXIMUM 20000
+
+//
+// Provides an adapter to the session implementation without having to expose the entire header file. This is a
+// step-wise decoupling of the session implementation from the other objects in this component. The whole
+// thing will make a bit more sense in the following updates.
+//
+class SessionAdapterImpl : public SessionAdapter
 {
-    /* Add ourselves to the ICE ASM so we can be used. */
-    mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->addWithUUID(this));
+public:
+    SessionAdapterImpl(const RTPSessionImplPtr& sessionServant) :
+        mServant(sessionServant)
+    {
+    }
 
+        void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr& sinkStateItem) 
+    {
+        mServant->replicateState(0, sinkStateItem, 0);
+    }
+
+    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr& sourceStateItem)
+    {
+        mServant->replicateState(0, 0, sourceStateItem);
+    }
+
+    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload) 
+    {
+        return mServant->getFormat(payload);
+    }
+
+    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) 
+    {
+        return mServant->getPayload(format);
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats()
+    {
+        return mServant->getFormats();
+    }
+
+    void setRemoteDetails(const std::string& host, int port)
+    {
+        mServant->setRemoteDetails(host, port);
+    }
+
+    pjmedia_transport* getTransport()
+    {
+        return mServant->getTransport();
+    }
+
+private:
+    RTPSessionImplPtr mServant;
+};
+
+/**
+ * Constructor for the RTPSessionImpl class (used by Ice).
+ */
+RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
+        const string& id,
+        const PJMediaEnvironmentPtr& env,
+        const RTPServiceLocatorParamsPtr& params,
+        const ReplicaPrx& replicaService,
+        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+        const ConfigurationServiceImplPtr& configurationService) : 
+    mSessionAdapter(new SessionAdapterImpl(this)),
+    mEnvironment(env),
+    mId(id),
+    mAdapter(adapter),
+    mFormats(params->formats),
+    mSessionStateItem(new RtpSessionStateItem),
+    mReplicaService(replicaService),
+    mStateReplicator(stateReplicator)
+{
     /* Create an endpoint in pjmedia for our media. */
-    pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
+    pj_status_t status = pjmedia_endpt_create(mEnvironment->poolFactory(), 0, configurationService->getWorkerThreadCount(),
+            &mEndpoint);
 
     assert(status == PJ_SUCCESS);
 
@@ -152,12 +279,14 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPSe
     if (params->ipv6 == true)
     {
 	af = pj_AF_INET6();
-	pj_strset(&binding, (char*)(configurationService->getBindIPv6address().c_str()), (configurationService->getBindIPv6address().size()));
+	pj_strset(&binding, (char*)(configurationService->getBindIPv6address().c_str()),
+                (configurationService->getBindIPv6address().size()));
     }
     else
     {
 	af = pj_AF_INET();
-        pj_strset(&binding, (char*)(configurationService->getBindIPv4address().c_str()), (configurationService->getBindIPv4address().size()));
+        pj_strset(&binding, (char*)(configurationService->getBindIPv4address().c_str()),
+                (configurationService->getBindIPv4address().size()));
     }
 
     int minimumPort = configurationService->getStartPort();
@@ -166,52 +295,58 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPSe
     /* Now create some transport we can use to actually send or receive the media. */
     for (int port = minimumPort; port < maximumPort; port += 2)
     {
-        if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", pj_strlen(&binding) ? &binding : NULL, port, 0, &mImpl->mTransport)) ==
+        if ((status = pjmedia_transport_udp_create3(mEndpoint, af, "RTP", pj_strlen(&binding) ? &binding : NULL, port, 0, &mTransport)) ==
                 PJ_SUCCESS)
         {
-	    mImpl->mSessionStateItem->mPort = port;
+	    mSessionStateItem->mPort = port;
             break;
         }
     }
 
     // Initialize our session state item enough so that the state items for the source and sink can also be initialized.
-    mImpl->mSessionStateItem->key = mImpl->mSessionStateItem->mSessionId = IceUtil::generateUUID();
-    mImpl->mSessionStateItem->mSessionIdentity = mImpl->mProxy->ice_getIdentity();
-    mImpl->mSessionStateItem->mFormats = params->formats;
-    mImpl->mSessionStateItem->mIPv6 = params->ipv6;
+    mSessionStateItem->key = mSessionStateItem->mSessionId = IceUtil::generateUUID();
+    mSessionStateItem->mSessionIdentity = mAdapter->getCommunicator()->stringToIdentity(mId);
+    mSessionStateItem->mFormats = params->formats;
+    mSessionStateItem->mIPv6 = params->ipv6;
 
     /* First up for our own stuff is... a source! Media needs to come from somewhere, you know. */
-    mImpl->mStreamSource = new StreamSourceRTPImpl(this, mImpl->mSessionStateItem->key);
-    mImpl->mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSource));
-    mImpl->mSessionStateItem->mSourceIdentity = mImpl->mStreamSourceProxy->ice_getIdentity();
-
+    mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mSessionStateItem->key);
     /* And for my next trick a place for us to send media out. */
-    mImpl->mStreamSink = new StreamSinkRTPImpl(this, mImpl->mSessionStateItem->key);
-    mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->addWithUUID(mImpl->mStreamSink));
-    mImpl->mSessionStateItem->mSinkIdentity = mImpl->mStreamSinkProxy->ice_getIdentity();
+    mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mSessionStateItem->key);
 
-    // Since everything has just come into creation send one big update with all state items
-    replicateState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
+    //
+    // All of this new stuff will be replicated on activatation.
+    //
 }
 
 /**
  * Constructor for the RTPSessionImpl class (used by state replicator).
  */
-RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory* factory,
-    const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
-    Ice::Int port, const FormatSeq& formats, bool ipv6, const ConfigurationServiceImplPtr& configurationService) :
-    mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>)))
+RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter,
+        const string& sessionIdentity,
+        const PJMediaEnvironmentPtr& env,
+        Ice::Int port,
+        const FormatSeq& formats,
+        bool ipv6,
+        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& replicatorPrx,
+        const ConfigurationServiceImplPtr& configurationService) :
+    mSessionAdapter(new SessionAdapterImpl(this)),
+    mEnvironment(env),
+    mId(sessionIdentity),
+    mAdapter(adapter),
+    mFormats(formats),
+    mSessionStateItem(0),
+    mReplicaService(0),
+    mStateReplicator(replicatorPrx)
 {
-    mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
-
-    pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
+    pj_status_t status = pjmedia_endpt_create(mEnvironment->poolFactory(), 0, configurationService->getWorkerThreadCount(), &mEndpoint);
 
     assert(status == PJ_SUCCESS);
 
     int af;
     pj_str_t binding;
 
-    if (ipv6 == true)
+    if (ipv6)
     {
         af = pj_AF_INET6();
         pj_strset(&binding, (char*)(configurationService->getBindIPv6address().c_str()), (configurationService->getBindIPv6address().size()));
@@ -222,24 +357,20 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_fac
         pj_strset(&binding, (char*)(configurationService->getBindIPv4address().c_str()), (configurationService->getBindIPv4address().size()));
     }
 
-    if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", pj_strlen(&binding) ? &binding : NULL, port, 0, &mImpl->mTransport))
+    if ((status = pjmedia_transport_udp_create3(mEndpoint, af, "RTP", pj_strlen(&binding) ? &binding : NULL, port, 0, &mTransport))
             != PJ_SUCCESS)
     {
 	// TODO: This is also bad, something is using the port
     }
 
-    mImpl->mStreamSource = new StreamSourceRTPImpl(this, "");
-    mImpl->mStreamSourceProxy =
-        StreamSourceRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSource, sourceIdentity));
-
-    mImpl->mStreamSink = new StreamSinkRTPImpl(this, "");
-    mImpl->mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mImpl->mAdapter->add(mImpl->mStreamSink, sinkIdentity));
+    mStreamSource = new StreamSourceRTPImpl(mSessionAdapter, mId);
+    mStreamSink = new StreamSinkRTPImpl(mSessionAdapter, mId);
 }
 
 /**
  * Destructor for the RTPSessionImplPriv class.
  */
-RTPSessionImplPriv::~RTPSessionImplPriv()
+RTPSessionImpl::~RTPSessionImpl()
 {
     /* Discontinue the media transport. */
     pjmedia_transport_close(mTransport);
@@ -255,7 +386,7 @@ StreamSourceSeq RTPSessionImpl::getSources(const Ice::Current&)
 {
     /* We only support one stream source per RTP session right now, so just return the one. */
     StreamSourceSeq sources;
-    sources.push_back(mImpl->mStreamSourceProxy);
+    sources.push_back(mStreamSourceProxy);
     return sources;
 }
 
@@ -266,7 +397,7 @@ StreamSinkSeq RTPSessionImpl::getSinks(const Ice::Current&)
 {
     /* We only support one stream sink per RTP session right now, so just return the one. */
     StreamSinkSeq sinks;
-    sinks.push_back(mImpl->mStreamSinkProxy);
+    sinks.push_back(mStreamSinkProxy);
     return sinks;
 }
 
@@ -278,7 +409,7 @@ std::string RTPSessionImpl::getId(const Ice::Current&)
     /* Instead of having to maintain our own identifier for this RTP session we can use the identifier
      * created when it was added to the ICE ASM.
      */
-    return mImpl->mProxy->ice_getIdentity().name;
+    return mId;
 }
 
 /**
@@ -303,22 +434,22 @@ RTCPSessionPrx RTPSessionImpl::getRTCPSession(const Ice::Current&)
 void RTPSessionImpl::release(const Ice::Current&)
 {
     // Remove everything from the state replicator if present
-    removeState(mImpl->mSessionStateItem, mImpl->mStreamSink->getStateItem(), mImpl->mStreamSource->getStateItem());
+    removeState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
 
     /* Drop the source and sink from the ASM */
-    mImpl->mAdapter->remove(mImpl->mStreamSourceProxy->ice_getIdentity());
-    mImpl->mAdapter->remove(mImpl->mStreamSinkProxy->ice_getIdentity());
+    mAdapter->remove(mStreamSourceProxy->ice_getIdentity());
+    mAdapter->remove(mStreamSinkProxy->ice_getIdentity());
 
     /* Since both the source and sink have a pointer back to the session we need to get rid of them,
      * which will in turn get rid of ourselves once we are removed from the ASM.
      */
-    mImpl->mStreamSource = 0;
-    mImpl->mStreamSink = 0;
+    mStreamSource = 0;
+    mStreamSink = 0;
 
     /* All we have to do is remove ourselves from the ASM, our smart pointerness will cause us to
      * destruct and then cleanup will occur.
      */
-    mImpl->mAdapter->remove(mImpl->mProxy->ice_getIdentity());
+    mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
 }
 
 /**
@@ -326,25 +457,11 @@ void RTPSessionImpl::release(const Ice::Current&)
  */
 void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings, const Ice::Current&)
 {
-    mImpl->mSessionStateItem->mPayloadstoFormats = mappings;
-
-    for (PayloadMap::const_iterator it = mappings.begin(); it != mappings.end(); ++it)
-    {
-        mImpl->mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
-    }
+    mSessionStateItem->mPayloadstoFormats = mappings;
+    associatePayloadsImpl(mappings);
 
     // Only the session has changed so push a single update out for it
-    replicateState(mImpl->mSessionStateItem, 0, 0);
-}
-
-/**
- * API call which returns a proxy to this RTP session.
- *
- * @return A proxy to this RTP session.
- */
-RTPSessionPrx RTPSessionImpl::getProxy()
-{
-    return mImpl->mProxy;
+    replicateState(mSessionStateItem, 0, 0);
 }
 
 /**
@@ -354,7 +471,7 @@ RTPSessionPrx RTPSessionImpl::getProxy()
  */
 pjmedia_transport* RTPSessionImpl::getTransport()
 {
-    return mImpl->mTransport;
+    return mTransport;
 }
 
 /**
@@ -364,7 +481,7 @@ pjmedia_transport* RTPSessionImpl::getTransport()
  */
 FormatSeq RTPSessionImpl::getFormats()
 {
-    return mImpl->mFormats;
+    return mFormats;
 }
 
 /**
@@ -372,7 +489,7 @@ FormatSeq RTPSessionImpl::getFormats()
  */
 void RTPSessionImpl::setRemoteDetails(const string& address, Ice::Int port)
 {
-    mImpl->mStreamSource->setRemoteDetails(address, port);
+    mStreamSource->setRemoteDetails(address, port);
 }
 
 /**
@@ -382,9 +499,9 @@ void RTPSessionImpl::setRemoteDetails(const string& address, Ice::Int port)
  */
 FormatPtr RTPSessionImpl::getFormat(int payload)
 {
-    PayloadMap::iterator it = mImpl->mSessionStateItem->mPayloadstoFormats.find(payload);
+    PayloadMap::iterator it = mSessionStateItem->mPayloadstoFormats.find(payload);
 
-    if (it == mImpl->mSessionStateItem->mPayloadstoFormats.end())
+    if (it == mSessionStateItem->mPayloadstoFormats.end())
     {
         return 0;
     }
@@ -399,9 +516,9 @@ FormatPtr RTPSessionImpl::getFormat(int payload)
  */
 int RTPSessionImpl::getPayload(const FormatPtr& mediaformat)
 {
-    map<string, int>::iterator it = mImpl->mFormatstoPayloads.find(mediaformat->name);
+    map<string, int>::iterator it = mFormatstoPayloads.find(mediaformat->name);
 
-    if (it == mImpl->mFormatstoPayloads.end())
+    if (it == mFormatstoPayloads.end())
     {
         return -1;
     }
@@ -409,20 +526,14 @@ int RTPSessionImpl::getPayload(const FormatPtr& mediaformat)
     return (*it).second;
 }
 
-/**
- * API call which returns a pointer to the source.
- */
-StreamSourceRTPImplPtr RTPSessionImpl::getSource()
+StreamSourceRTPImplPtr RTPSessionImpl::getSourceServant()
 {
-    return mImpl->mStreamSource;
+    return mStreamSource;
 }
 
-/**
- * API call which returns a pointer to the sink.
- */
-StreamSinkRTPPtr RTPSessionImpl::getSink()
+StreamSinkRTPPtr RTPSessionImpl::getSinkServant()
 {
-    return mImpl->mStreamSink;
+    return mStreamSink;
 }
 
 /**
@@ -432,7 +543,7 @@ void RTPSessionImpl::replicateState(const RtpSessionStateItemPtr& session, const
         const RtpStreamSourceStateItemPtr& source)
 {
     // If state replication has been disabled do nothing
-    if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+    if (!mStateReplicator || mReplicaService->isActive() == false)
     {
 	return;
     }
@@ -461,12 +572,12 @@ void RTPSessionImpl::replicateState(const RtpSessionStateItemPtr& session, const
 
     try
     {
-	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mStateReplicator->ice_oneway());
 	oneway->setState(items);
     }
     catch (...)
     {
-	mImpl->mStateReplicator->setState(items);
+	mStateReplicator->setState(items);
     }
 }
 
@@ -477,7 +588,7 @@ void RTPSessionImpl::removeState(const RtpSessionStateItemPtr& session, const Rt
         const RtpStreamSourceStateItemPtr& source)
 {
     // If state replication has been disabled do nothing
-    if (!mImpl->mStateReplicator || mImpl->mReplicaService->isActive() == false)
+    if (!mStateReplicator || mReplicaService->isActive() == false)
     {
         return;
     }
@@ -506,11 +617,127 @@ void RTPSessionImpl::removeState(const RtpSessionStateItemPtr& session, const Rt
 
     try
     {
-	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mStateReplicator->ice_oneway());
 	oneway->removeState(items);
     }
     catch (...)
     {
-	mImpl->mStateReplicator->removeState(items);
+	mStateReplicator->removeState(items);
+    }
+}
+
+void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
+{
+    for (PayloadMap::const_iterator it = mappings.begin(); it != mappings.end(); ++it)
+    {
+        mFormatstoPayloads.insert(make_pair((*it).second->name, (*it).first));
     }
 }
+
+RTPSessionPrx RTPSessionImpl::activate(const string& id)
+{
+    assert(id == mId);
+    Ice::Identity sourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+    Ice::Identity sinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+    return activate(mAdapter->getCommunicator()->stringToIdentity(id), sourceId, sinkId);
+}
+
+RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id, const Ice::Identity& sourceId,
+        const Ice::Identity& sinkId)
+{
+    mStreamSourceProxy = StreamSourceRTPPrx::uncheckedCast(mAdapter->add(mStreamSource, sourceId));
+    mStreamSinkProxy = StreamSinkRTPPrx::uncheckedCast(mAdapter->add(mStreamSink, sinkId));
+
+    if (mSessionStateItem)
+    {
+        mSessionStateItem->key = mSessionStateItem->mSessionId = mId;
+        mSessionStateItem->mSessionIdentity = id;
+        mSessionStateItem->mFormats = mFormats;
+        mSessionStateItem->mSourceIdentity = sourceId;
+        mSessionStateItem->mSinkIdentity = sinkId;
+
+        //
+        // XXX- this will be replaced in following updates.
+        //
+        pjmedia_transport_info transportInfo;
+        pjmedia_transport_info_init(&transportInfo);
+        pjmedia_transport_get_info(mTransport, &transportInfo);
+        mSessionStateItem->mPort = pj_sockaddr_get_port(&transportInfo.sock_info.rtp_addr_name);
+        replicateState(mSessionStateItem, mStreamSink->getStateItem(), mStreamSource->getStateItem());
+    }
+
+    return RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
+}
+
+
+class ReplicationAdapterImpl : public ReplicationAdapter
+{
+public:
+
+    void update(const RtpSessionStateItemPtr& item)
+    {
+        mImpl->associatePayloadsImpl(item->mPayloadstoFormats);
+    }
+
+    void update(const RtpStreamSinkStateItemPtr& item)
+    {
+        //
+        // XXX Using the slice defined methods will be changed in subsequent updates.
+        //
+        mImpl->getSinkServant()->setSource(item->mSource, Ice::Current());
+        mImpl->getSinkServant()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort, Ice::Current());
+        mImpl->getSourceServant()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort);
+    }
+
+    void update(const RtpStreamSourceStateItemPtr& item)
+    {
+        //
+        // XXX Using the slice defined methods will be changed in subsequent updates.
+        //
+        mImpl->getSourceServant()->setSink(item->mSink, Ice::Current());
+    }
+
+    void destroy()
+    {
+        //
+        // XXX Using the slice defined methods will be changed in subsequent updates.
+        //
+        mImpl->release(Ice::Current());
+    }
+
+    ReplicationAdapterImpl(const RTPSessionImplPtr& impl) :
+        mImpl(impl)
+    {
+    }
+private:
+
+    RTPSessionImplPtr mImpl;
+};
+
+RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+        const std::string& id, const RTPServiceLocatorParamsPtr& params,
+        const PJMediaEnvironmentPtr& environment, 
+        const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
+        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+        const ConfigurationServiceImplPtr& configuration)
+{
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, id, environment, params,
+                    replicaControl, stateReplicator, configuration));
+    return servant->activate(id);
+}
+
+ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+  const PJMediaEnvironmentPtr& environment, 
+  const RtpSessionStateItemPtr& item,
+  const ConfigurationServiceImplPtr& configuration)
+{
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, 
+                    adapter->getCommunicator()->identityToString(item->mSessionIdentity), 
+                    environment,
+                    item->mPort, item->mFormats, item->mIPv6,
+                    AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(), 
+                    configuration));
+    servant->activate(item->mSessionIdentity, item-> mSourceIdentity, item->mSinkIdentity);
+    return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
+}
+
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 4ba6e86..f2ae01c 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -8,78 +8,40 @@
 
 #pragma once
 
-#include <boost/shared_ptr.hpp>
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-
 #include "RTPConfiguration.h"
+#include "PJMediaEnvironment.h"
+#include "RtpStateReplicationIf.h"
+#include "ReplicationAdapter.h"
 
-/**
- * Forward definition for our private implementation of RTPSession.
- */
-class RTPSessionImplPriv;
-
-/**
- * Forward definition for our private implementation of StreamSinkRTP.
- */
-class StreamSinkRTPImpl;
-
-/**
- * A typedef which creates a smart pointer type for StreamSinkRTPImpl.
- */
-typedef IceUtil::Handle<StreamSinkRTPImpl> StreamSinkRTPImplPtr;
-
-/**
- * Forward definition for our private implementation of StreamSourceRTP.
- */
-class StreamSourceRTPImpl;
-
-/**
- * A typedef which creates a smart pointer type for StreamSourceRTPImpl.
- */
-typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include <boost/shared_ptr.hpp>
 
-/**
- * Implementation of the RTPSession interface as defined in MediaRTPIf.ice
- */
-class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
+namespace AsteriskSCF
+{
+namespace PJMediaRTP
+{
+class RTPSession
 {
 public:
-    RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr&,
-	pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&, 
-	const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
-	const ConfigurationServiceImplPtr&);
-    RTPSessionImpl(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const Ice::Identity&, const Ice::Identity&,
-	const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&, bool,
-	const ConfigurationServiceImplPtr&);
-    AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
-    AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
-    std::string getId(const Ice::Current&);
-    void useRTCP(bool, const Ice::Current&);
-    AsteriskSCF::Media::RTP::V1::RTCPSessionPrx getRTCPSession(const Ice::Current&);
-    void release(const Ice::Current&);
-    AsteriskSCF::Media::RTP::V1::RTPSessionPrx getProxy();
-    pjmedia_transport* getTransport();
-    AsteriskSCF::Media::V1::FormatSeq getFormats();
-    void associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap&, const Ice::Current&);
-    void setRemoteDetails(const std::string& address, Ice::Int port);
-    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload);
-    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& mediaformat);
-    StreamSourceRTPImplPtr getSource();
-    AsteriskSCF::Media::RTP::V1::StreamSinkRTPPtr getSink();
-    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
-            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
-            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
-    void removeState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
-            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
-            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
-private:
-    /**
-     * Private implementation data for RTPSessionImpl.
-     */
-    boost::shared_ptr<RTPSessionImplPriv> mImpl;
+    static AsteriskSCF::Media::RTP::V1::RTPSessionPrx create(const Ice::ObjectAdapterPtr& objectAdapter,
+            const std::string& id,
+            const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr& params,
+            const PJMediaEnvironmentPtr& environment,
+            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
+            const AsteriskSCF::Discovery::SmartProxy<
+                AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>& stateReplicator,
+            const ConfigurationServiceImplPtr& configuration
+    );
+
+    static ReplicationAdapterPtr create(const Ice::ObjectAdapterPtr& objectAdapter,
+            const PJMediaEnvironmentPtr& environment,
+            const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr& update,
+            const ConfigurationServiceImplPtr& configuration
+    );
 };
+} /* End of namespace RTPMedia */
+} /* End of namespace AsteriskSCF */
 
-/**
- * A typedef which creates a smart pointer type for RTPSessionImpl.
- */
-typedef IceUtil::Handle<RTPSessionImpl> RTPSessionImplPtr;
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 41e7b18..35ad87a 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -14,6 +14,9 @@
  * at the top of the source tree.
  */
 
+#include "RTPSink.h"
+#include "RtpStateReplicationIf.h"
+
 #include <pjlib.h>
 #include <pjmedia.h>
 
@@ -24,15 +27,12 @@
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-#include "RtpStateReplicationIf.h"
-
-#include "RTPSession.h"
-#include "RTPSink.h"
-
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::PJMediaRTP;
+
 
 /**
  * Private implementation details for the StreamSinkRTPImpl class.
@@ -43,7 +43,7 @@ public:
     /**
      * Constructor for our StreamSinkRTPImplPriv class.
      */
-    StreamSinkRTPImplPriv(const RTPSessionImplPtr&, const std::string&);
+    StreamSinkRTPImplPriv(const SessionAdapterPtr&, const std::string&);
 
     /**
      * A structure containing outgoing pjmedia session data.
@@ -53,19 +53,24 @@ public:
     /**
      * A pointer to the RTP session we are associated with.
      */
-    RTPSessionImplPtr mSession;
+    SessionAdapterPtr mSessionAdapter;
 
     /**
      * Stream sink state item.
      */
     RtpStreamSinkStateItemPtr mSinkStateItem;
+
+    /**
+     * The parent session's id.
+     */
+    string mSessionId;
 };
 
 /**
  * Constructor for the StreamSinkRTPImplPriv class.
  */
-StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(const RTPSessionImplPtr& session, const string& sessionId) :
-    mSession(session), mSinkStateItem(new RtpStreamSinkStateItem)
+StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(const SessionAdapterPtr& session, const string& sessionId) :
+    mSessionAdapter(session), mSinkStateItem(new RtpStreamSinkStateItem), mSessionId(sessionId)
 {
     pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
     mSinkStateItem->mSessionId = sessionId;
@@ -76,7 +81,7 @@ StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(const RTPSessionImplPtr& session, c
 /**
  * Constructor for the StreamSinkRTPImpl class.
  */
-StreamSinkRTPImpl::StreamSinkRTPImpl(const RTPSessionImplPtr& session, const string& sessionId) :
+StreamSinkRTPImpl::StreamSinkRTPImpl(const SessionAdapterPtr& session, const string& sessionId) :
     mImpl(new StreamSinkRTPImplPriv(session, sessionId))
 {
 }
@@ -107,15 +112,16 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
         int payload;
 
         // Only allow media formats through that we support
-        if ((payload = mImpl->mSession->getPayload((*frame)->mediaformat)) < 0)
+        if ((payload = mImpl->mSessionAdapter->getPayload((*frame)->mediaformat)) < 0)
         {
             throw UnsupportedMediaFormatException();
         }
 
         /* Using the available information construct an RTP header that we can place at the front of our packet */
         pj_status_t status = pjmedia_rtp_encode_rtp(&mImpl->mOutgoingSession,
-						    mImpl->mSession->getPayload((*frame)->mediaformat), 0, (int) (*frame)->payload.size(),
-						    (int) (*frame)->payload.size(), &header, &header_len);
+                mImpl->mSessionAdapter->getPayload((*frame)->mediaformat), 0,
+                (int) (*frame)->payload.size(),
+                (int) (*frame)->payload.size(), &header, &header_len);
 
         if (status != PJ_SUCCESS)
         {
@@ -133,7 +139,7 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
         pj_memcpy(packet + header_len, &(*frame)->payload[0], (*frame)->payload.size());
 
         /* All done, transmission can now occur */
-        status = pjmedia_transport_send_rtp(mImpl->mSession->getTransport(), packet,
+        status = pjmedia_transport_send_rtp(mImpl->mSessionAdapter->getTransport(), packet,
                 (*frame)->payload.size() + header_len);
 
         if (status != PJ_SUCCESS)
@@ -151,7 +157,7 @@ void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&
 {
     mImpl->mSinkStateItem->mSource = source;
 
-    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
+    mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
 }
 
 /**
@@ -167,7 +173,7 @@ AsteriskSCF::Media::V1::StreamSourcePrx StreamSinkRTPImpl::getSource(const Ice::
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Current&)
 {
-    return mImpl->mSession->getFormats();
+    return mImpl->mSessionAdapter->getFormats();
 }
 
 /**
@@ -176,7 +182,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
 std::string StreamSinkRTPImpl::getId(const Ice::Current& current)
 {
     /* For now utilize the id of the session */
-    return mImpl->mSession->getId(current);
+    return mImpl->mSessionId;
 }
 
 /**
@@ -187,7 +193,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
     /* This method is essentially a passthru to the RTPSourceImpl. It takes care of
      * actually attaching the transport.
      */
-    mImpl->mSession->setRemoteDetails(address, port);
+    mImpl->mSessionAdapter->setRemoteDetails(address, port);
 
     /* We do store it though in case we have not yet received a packet from the remote side but
      * are asked for the remote address. It is also stored for replication purposes.
@@ -195,7 +201,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
     mImpl->mSinkStateItem->mRemoteAddress = address;
     mImpl->mSinkStateItem->mRemotePort = port;
 
-    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
+    mImpl->mSessionAdapter->replicateState(mImpl->mSinkStateItem);
 }
 
 /**
@@ -206,7 +212,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
     pjmedia_transport_info transportInfo;
 
     pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+    pjmedia_transport_get_info(mImpl->mSessionAdapter->getTransport(), &transportInfo);
 
     if (transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET &&
 	transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET6)
@@ -229,7 +235,7 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
     pjmedia_transport_info transportInfo;
 
     pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+    pjmedia_transport_get_info(mImpl->mSessionAdapter->getTransport(), &transportInfo);
 
     if (transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET &&
         transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET6)
diff --git a/src/RTPSink.h b/src/RTPSink.h
index 6254d5a..d2e7b43 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -8,7 +8,9 @@
 
 #pragma once
 
+#include "SessionAdapter.h"
 #include <boost/shared_ptr.hpp>
+#include <IceUtil/Handle.h>
 
 /**
  * Forward definition for our private implementation of StreamSinkRTP.
@@ -21,7 +23,7 @@ class StreamSinkRTPImplPriv;
 class StreamSinkRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSinkRTP
 {
 public:
-    StreamSinkRTPImpl(const RTPSessionImplPtr&, const std::string&);
+    StreamSinkRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr&, const std::string&);
     void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
     void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
@@ -37,3 +39,5 @@ private:
      */
     boost::shared_ptr<StreamSinkRTPImplPriv> mImpl;
 };
+
+typedef IceUtil::Handle<StreamSinkRTPImpl> StreamSinkRTPImplPtr;
\ No newline at end of file
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 3bc22c1..6a85a0d 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -14,6 +14,9 @@
  * at the top of the source tree.
  */
 
+#include "RTPSource.h"
+#include "RtpStateReplicationIf.h"
+
 #include <pjlib.h>
 #include <pjmedia.h>
 #include <Ice/Ice.h>
@@ -24,16 +27,12 @@
 #include <AsteriskSCF/logger.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
-#include "RtpStateReplicationIf.h"
-
-#include "RTPSession.h"
-#include "RTPSource.h"
-
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::PJMediaRTP;
 
 namespace
 {
@@ -49,7 +48,7 @@ public:
     /**
      * Constructor for our StreamSourceRTPImplPriv class.
      */
-    StreamSourceRTPImplPriv(const RTPSessionImplPtr&, const string&);
+    StreamSourceRTPImplPriv(const SessionAdapterPtr&, const string&);
 
     /**
      * A structure containing incoming pjmedia session data.
@@ -59,19 +58,21 @@ public:
     /**
      * A pointer to the RTP session we are associated with.
      */
-    RTPSessionImplPtr mSession;
+    SessionAdapterPtr mSessionAdapter;
 
     /**
      * Stream source state item.
      */
     RtpStreamSourceStateItemPtr mSourceStateItem;
+
+    string mSessionId;
 };
 
 /**
  * Constructor for the StreamSourceRTPImplPriv class.
  */
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const RTPSessionImplPtr& session, const string& sessionId) :
-    mSession(session), mSourceStateItem(new RtpStreamSourceStateItem)
+StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const SessionAdapterPtr& session, const string& sessionId) :
+    mSessionAdapter(session), mSourceStateItem(new RtpStreamSourceStateItem), mSessionId(sessionId)
 {
     pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
     mSourceStateItem->mSessionId = sessionId;
@@ -81,7 +82,7 @@ StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const RTPSessionImplPtr& sessio
 /**
  * Constructor for the StreamSourceRTPImpl class.
  */
-StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const string& sessionId) :
+StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session, const string& sessionId) :
     mImpl(new StreamSourceRTPImplPriv(session, sessionId))
 {
 }
@@ -93,7 +94,7 @@ void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& s
 {
     mImpl->mSourceStateItem->mSink = sink;
 
-    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+    mImpl->mSessionAdapter->replicateState(mImpl->mSourceStateItem);
 }
 
 /**
@@ -109,7 +110,7 @@ AsteriskSCF::Media::V1::StreamSinkPrx StreamSourceRTPImpl::getSink(const Ice::Cu
  */
 AsteriskSCF::Media::V1::FormatSeq StreamSourceRTPImpl::getFormats(const Ice::Current&)
 {
-    return mImpl->mSession->getFormats();
+    return mImpl->mSessionAdapter->getFormats();
 }
 
 /**
@@ -118,7 +119,7 @@ AsteriskSCF::Media::V1::FormatSeq StreamSourceRTPImpl::getFormats(const Ice::Cur
 std::string StreamSourceRTPImpl::getId(const Ice::Current& current)
 {
     /* For now utilize the id of the session */
-    return mImpl->mSession->getId(current);
+    return mImpl->mSessionId;
 }
 
 /**
@@ -138,7 +139,7 @@ std::string StreamSourceRTPImpl::getLocalAddress(const Ice::Current&)
     pjmedia_transport_info transportInfo;
 
     pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+    pjmedia_transport_get_info(mImpl->mSessionAdapter->getTransport(), &transportInfo);
 
     char tmp_addr[PJ_INET6_ADDRSTRLEN];
     return pj_sockaddr_print(&transportInfo.sock_info.rtp_addr_name, tmp_addr, sizeof(tmp_addr), 0);
@@ -152,7 +153,7 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
     pjmedia_transport_info transportInfo;
 
     pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+    pjmedia_transport_get_info(mImpl->mSessionAdapter->getTransport(), &transportInfo);
 
     return pj_sockaddr_get_port(&transportInfo.sock_info.rtp_addr_name);
 }
@@ -189,7 +190,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
 
     if (source->mImpl->mSourceStateItem->mSink != 0)
     {
-        FormatPtr mediaformat = source->mImpl->mSession->getFormat(header->pt);
+        FormatPtr mediaformat = source->mImpl->mSessionAdapter->getFormat(header->pt);
 
         if (mediaformat != 0)
         {
@@ -251,7 +252,7 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
     pjmedia_transport_info transportInfo;
 
     pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+    pjmedia_transport_get_info(mImpl->mSessionAdapter->getTransport(), &transportInfo);
 
     if (transportInfo.sock_info.rtp_addr_name.addr.sa_family != addr.addr.sa_family)
     {
@@ -261,10 +262,10 @@ void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
     pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
 
     /* In case we were already attached go ahead and detach */
-    pjmedia_transport_detach(mImpl->mSession->getTransport(), this);
+    pjmedia_transport_detach(mImpl->mSessionAdapter->getTransport(), this);
 
     /* All ready... actually do it! */
-    status = pjmedia_transport_attach(mImpl->mSession->getTransport(), this, &addr, NULL, pj_sockaddr_get_len(&addr), &receiveRTP, NULL);
+    status = pjmedia_transport_attach(mImpl->mSessionAdapter->getTransport(), this, &addr, NULL, pj_sockaddr_get_len(&addr), &receiveRTP, NULL);
 
     if (status != PJ_SUCCESS)
     {
diff --git a/src/RTPSource.h b/src/RTPSource.h
index d12bc48..6eddc1f 100644
--- a/src/RTPSource.h
+++ b/src/RTPSource.h
@@ -8,7 +8,9 @@
 
 #pragma once
 
+#include "SessionAdapter.h"
 #include <boost/shared_ptr.hpp>
+#include <IceUtil/Handle.h>
 
 /**
  * Forward definition for our private implementation of StreamSourceRTP.
@@ -21,7 +23,7 @@ class StreamSourceRTPImplPriv;
 class StreamSourceRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSourceRTP
 {
 public:
-    StreamSourceRTPImpl(const RTPSessionImplPtr&, const std::string&);
+    StreamSourceRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr&, const std::string&);
     void setSink(const AsteriskSCF::Media::V1::StreamSinkPrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkPrx getSink(const Ice::Current&);
     AsteriskSCF::Media::V1::FormatSeq getFormats(const Ice::Current&);
@@ -38,3 +40,5 @@ public:
      */
     boost::shared_ptr<StreamSourceRTPImplPriv> mImpl;
 };
+
+typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
\ No newline at end of file
diff --git a/src/ReplicationAdapter.h b/src/ReplicationAdapter.h
new file mode 100644
index 0000000..32d341d
--- /dev/null
+++ b/src/ReplicationAdapter.h
@@ -0,0 +1,43 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include "RtpStateReplicationIf.h"
+#include <string>
+#include <boost/shared_ptr.hpp>
+
+namespace AsteriskSCF
+{
+namespace PJMediaRTP
+{
+class ReplicationAdapter;
+typedef boost::shared_ptr<ReplicationAdapter> ReplicationAdapterPtr;
+
+class ReplicationAdapter
+{
+public:
+    virtual ~ReplicationAdapter() {}
+
+    virtual void update(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr& item) = 0;
+    virtual void update(const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr& item) = 0;
+    virtual void update(const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr& item) = 0;
+    virtual void destroy() = 0;
+};
+
+} /* End of namespace PJMediaRTP */
+} /* End of namespace AsteriskSCF */
+
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index b244dda..3bb1b4b 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -16,16 +16,14 @@
 
 #pragma once
 
-#include <Ice/Ice.h>
-
-#include <AsteriskSCF/Replication/StateReplicator.h>
 #include "RtpStateReplicationIf.h"
 #include "RTPConfiguration.h"
+#include "PJMediaEnvironment.h"
 
+#include <Ice/Ice.h>
+#include <AsteriskSCF/Replication/StateReplicator.h>
 #include <boost/shared_ptr.hpp>
 
-#include "RTPConfiguration.h"
-
 
 typedef AsteriskSCF::Replication::StateReplicator<
     AsteriskSCF::Media::RTP::V1::RtpStateReplicator, 
@@ -42,7 +40,7 @@ struct RtpStateReplicatorListenerImpl;
 class RtpStateReplicatorListenerI : public AsteriskSCF::Media::RTP::V1::RtpStateReplicatorListener
 {
 public:
-    RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*, 
+    RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, const AsteriskSCF::PJMediaRTP::PJMediaEnvironmentPtr&, 
 	const AsteriskSCF::Media::RTP::V1::RtpGeneralStateItemPtr&,
 	const ConfigurationServiceImplPtr&);
     ~RtpStateReplicatorListenerI();
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 4f3cef1..a71175a 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -24,40 +24,47 @@
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 
 #include "RtpStateReplicator.h"
+#include "ReplicationAdapter.h"
 #include "RTPSession.h"
-#include "RTPSink.h"
-#include "RTPSource.h"
 
 using namespace std;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::PJMediaRTP;
 
 class RtpStateReplicatorItem
 {
 public:
-    RtpStateReplicatorItem() { }
+
     ~RtpStateReplicatorItem()
     {
-	mSession->release(Ice::Current());
+        mSession->destroy();
     }
 
-    // Helper function which sets the session on this replicator item
-    void setSession(RTPSessionImplPtr session) { mSession = session; };
+    //
+    // Standard set/get pair.
+    //
+    void setSession(const ReplicationAdapterPtr& session)
+    {
+        mSession = session;
+    }
 
-    // Helper function which gets the session
-    RTPSessionImplPtr getSession() { return mSession; };
+    ReplicationAdapterPtr getSession()
+    {
+        return mSession;
+    }
 
 private:
-    // Pointer to the session that we are managing
-    RTPSessionImplPtr mSession;
+
+    ReplicationAdapterPtr mSession;
 };
 
 struct RtpStateReplicatorListenerImpl
 {
 public:
-    RtpStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory *poolFactory,
+    RtpStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter, const PJMediaEnvironmentPtr& env,
 	const RtpGeneralStateItemPtr& generalState,
 	const ConfigurationServiceImplPtr& configurationService)
-        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState),
+        : mId(IceUtil::generateUUID()), mAdapter(adapter), mEnvironment(env), mGeneralState(generalState),
 	  mConfigurationService(configurationService) {}
     
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
@@ -68,6 +75,7 @@ public:
             mStateItems.erase((*key));
         }
     }
+    
     void setStateNoticeImpl(const RtpStateItemSeq& items)
     {
 	class visitor : public AsteriskSCF::Media::RTP::V1::RtpStateItemVisitor
@@ -97,18 +105,16 @@ public:
 		    localitem = newitem;
 		    mImpl->mStateItems.insert(make_pair(item->mSessionId, newitem));
 
-		    RTPSessionImplPtr localSession =
-                        new RTPSessionImpl(mImpl->mAdapter, mImpl->mPoolFactory, item->mSessionIdentity,
-			    item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats, item->mIPv6,
-			    mImpl->mConfigurationService);
-		    localitem->setSession(localSession);
+                    localitem->setSession(
+                        AsteriskSCF::PJMediaRTP::RTPSession::create(mImpl->mAdapter, mImpl->mEnvironment, item,
+                                mImpl->mConfigurationService));
 		}
 		else
 		{
 		    localitem = i->second;
 		}
 
-	        localitem->getSession()->associatePayloads(item->mPayloadstoFormats, Ice::Current());
+	        localitem->getSession()->update(item);
 	    }
 		    
 	    void visitRtpStreamSinkStateItem(const RtpStreamSinkStateItemPtr &item)
@@ -117,9 +123,7 @@ public:
                     mImpl->mStateItems.find(item->mSessionId);
 		if (i != mImpl->mStateItems.end())
 		{
-		    i->second->getSession()->getSink()->setSource(item->mSource, Ice::Current());
-		    i->second->getSession()->getSink()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort,
-                            Ice::Current());
+                    i->second->getSession()->update(item);
 		}
 	    }
 		    
@@ -129,7 +133,7 @@ public:
                     mImpl->mStateItems.find(item->mSessionId);
 		if (i != mImpl->mStateItems.end())
 		{
-		    i->second->getSession()->getSource()->setSink(item->mSink, Ice::Current());
+                    i->second->getSession()->update(item);
 		}
 	    }
 	};
@@ -145,15 +149,15 @@ public:
     string mId;
     map<string, boost::shared_ptr<RtpStateReplicatorItem> > mStateItems;
     Ice::ObjectAdapterPtr mAdapter;
-    pj_pool_factory *mPoolFactory;
+    PJMediaEnvironmentPtr mEnvironment;
     RtpGeneralStateItemPtr mGeneralState;
     ConfigurationServiceImplPtr mConfigurationService;
 };
 
 RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr& adapter,
-    pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState,
+    const PJMediaEnvironmentPtr& env, const RtpGeneralStateItemPtr& generalState,
     const ConfigurationServiceImplPtr& configurationService)
-    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState, configurationService)) {}
+    : mImpl(new RtpStateReplicatorListenerImpl(adapter, env, generalState, configurationService)) {}
 
 RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
 {
diff --git a/src/SessionAdapter.h b/src/SessionAdapter.h
new file mode 100644
index 0000000..dfd2234
--- /dev/null
+++ b/src/SessionAdapter.h
@@ -0,0 +1,64 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <string>
+#include <boost/shared_ptr.hpp>
+#include "RtpStateReplicationIf.h"
+
+//
+// XXX forward declaration that will be obsoleted soon.
+// 
+struct pjmedia_transport;
+
+namespace AsteriskSCF
+{
+namespace PJMediaRTP
+{
+
+class SessionAdapter;
+typedef boost::shared_ptr<SessionAdapter> SessionAdapterPtr;
+
+/**
+ * By formalizing the interface between the session servant and the source and sink servants, we can
+ * remove the physical compile time interdepencies. For the moment, this means that there will be
+ * an indirect circular reference between the three entities. This is fine for the moment because
+ * they are actually quite tightly coupled implementation-wise. e.g. setRemoteDetails propogates to the
+ * source servant.
+ **/
+class SessionAdapter
+{
+public:
+    virtual ~SessionAdapter() {}
+
+    virtual void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr& sinkStateItem) = 0;
+    virtual void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr& sourceStateItem) = 0; 
+
+    virtual AsteriskSCF::Media::V1::FormatPtr getFormat(int payload) = 0;
+    virtual int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) = 0;
+    virtual AsteriskSCF::Media::V1::FormatSeq getFormats() = 0;
+    virtual void setRemoteDetails(const std::string& host, int port) = 0;
+
+    //
+    // XXX this will be removed in following steps.
+    //
+    virtual pjmedia_transport* getTransport() = 0;
+};
+
+} /* End of namespace PJMediaRTP */
+} /* End of namespace AsteriskSCF */
+

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list