[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "nat-traversal" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 17 15:00:16 CDT 2011


branch "nat-traversal" has been updated
       via  593969853b9dff2a4b1032ab431807c38e7e20f0 (commit)
      from  063e9b5292bd991bebe02973acdaf2e78aa167fa (commit)

Summary of changes:
 src/CMakeLists.txt                                 |   15 +-
 src/ICEConfiguration.h                             |    2 +-
 src/ICETransport.cpp                               |  184 ++++++
 src/{NATModule.h => ICETransport.h}                |   38 +-
 src/MediaRTPpjmedia.cpp                            |   32 +-
 src/NATModule.cpp                                  |    5 +-
 src/NATModule.h                                    |    3 +-
 src/PJMediaEndpoint.cpp                            |   45 ++
 src/{NATModule.h => PJMediaEndpoint.h}             |   33 +-
 src/PJMediaEnvironment.cpp                         |   12 +-
 src/PJMediaEnvironment.h                           |   14 +-
 src/PJMediaTransport.cpp                           |   61 ++
 src/{NATModule.h => PJMediaTransport.h}            |   30 +-
 src/RTPICESession.cpp                              |  624 +++++++++++---------
 src/RTPICESession.h                                |   20 +-
 src/RTPSink.cpp                                    |   83 ++--
 src/RTPSink.h                                      |   12 +-
 src/RTPSource.cpp                                  |   65 +--
 src/RTPSource.h                                    |    9 +-
 src/{NATModule.h => ReplicationAdapter.h}          |   35 +-
 src/RtpStateReplicator.h                           |    3 +-
 src/RtpStateReplicatorListener.cpp                 |   45 +-
 ...JLibConfiguration.cpp => SRTPConfiguration.cpp} |   16 +-
 src/{PJLibConfiguration.h => SRTPConfiguration.h}  |   36 +-
 src/SRTPTransport.cpp                              |   54 ++
 src/{NATModule.h => SRTPTransport.h}               |   31 +-
 src/SessionAdapter.h                               |   53 ++
 src/UDPTransport.cpp                               |   69 +++
 src/{NATModule.h => UDPTransport.h}                |   47 +-
 29 files changed, 1128 insertions(+), 548 deletions(-)
 create mode 100644 src/ICETransport.cpp
 copy src/{NATModule.h => ICETransport.h} (50%)
 create mode 100644 src/PJMediaEndpoint.cpp
 copy src/{NATModule.h => PJMediaEndpoint.h} (62%)
 create mode 100644 src/PJMediaTransport.cpp
 copy src/{NATModule.h => PJMediaTransport.h} (58%)
 copy src/{NATModule.h => ReplicationAdapter.h} (55%)
 copy src/{PJLibConfiguration.cpp => SRTPConfiguration.cpp} (64%)
 copy src/{PJLibConfiguration.h => SRTPConfiguration.h} (64%)
 create mode 100644 src/SRTPTransport.cpp
 copy src/{NATModule.h => SRTPTransport.h} (56%)
 create mode 100644 src/SessionAdapter.h
 create mode 100644 src/UDPTransport.cpp
 copy src/{NATModule.h => UDPTransport.h} (55%)


- Log -----------------------------------------------------------------
commit 593969853b9dff2a4b1032ab431807c38e7e20f0
Author: Brent Eagles <beagles at digium.com>
Date:   Tue May 17 17:28:57 2011 -0230

    Added missing transport files. Broke "knowledge" dependency between the replication listener, session
    implementation and the source and sinks. Added a SRTP transport placeholder (configuration coming).

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ad6dc88..ec6e80d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -9,10 +9,8 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
 
 asterisk_scf_component_init(media_rtp_pjmedia)
 asterisk_scf_component_add_file(media_rtp_pjmedia MediaRTPpjmedia.cpp)
-asterisk_scf_component_add_file(media_rtp_pjmedia RTPSession.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSink.cpp)
-asterisk_scf_component_add_file(media_rtp_pjmedia RTPSession.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSink.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPICESession.cpp)
@@ -23,12 +21,25 @@ asterisk_scf_component_add_file(media_rtp_pjmedia NATConfig.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia NATConfig.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia ICEConfiguration.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia ICEConfiguration.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia SRTPConfiguration.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia SRTPConfiguration.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEnvironment.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEnvironment.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEndpoint.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaEndpoint.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaTransport.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia PJMediaTransport.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia UDPTransport.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia UDPTransport.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia ICETransport.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia ICETransport.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia SRTPTransport.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia SRTPTransport.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJLibConfiguration.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJLibConfiguration.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia PJUtil.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia ServiceConfig.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia SessionAdapter.h)
 
 asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicatorListener.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicator.h)
diff --git a/src/ICEConfiguration.h b/src/ICEConfiguration.h
index 31c265a..281ace5 100644
--- a/src/ICEConfiguration.h
+++ b/src/ICEConfiguration.h
@@ -67,5 +67,5 @@ private:
     void operator=(const ICEConfiguration&);
 };
 
-} /* End of namespace ICEMediaSession */
+} /* End of namespace PJMediaRTP */
 } /* End of namespace AsteriskSCF */
diff --git a/src/ICETransport.cpp b/src/ICETransport.cpp
new file mode 100644
index 0000000..52e36a3
--- /dev/null
+++ b/src/ICETransport.cpp
@@ -0,0 +1,184 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "ICETransport.h"
+#include "PJUtil.h"
+#include <pjmedia.h>
+#include <pjlib.h>
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <map>
+
+using namespace AsteriskSCF::PJMediaRTP;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::PJUtil;
+using namespace std;
+
+namespace 
+{
+class ICECallbackAdapter
+{
+public:
+    static void addEntry(pjmedia_transport* transport, const ICETransportPtr& callback);
+    static void onICEComplete(pjmedia_transport* transport, pj_ice_strans_op operation, pj_status_t status);
+
+    struct CallbackRecord
+    {
+        bool connected;
+        ICETransportPtr transport;
+    };
+    typedef std::map<pjmedia_transport*, CallbackRecord> TransportMap;
+
+private:
+    static TransportMap mTransportMap;
+    static boost::shared_mutex mLock;
+};
+
+//
+// Static member initializations.
+//
+ICECallbackAdapter::TransportMap ICECallbackAdapter::mTransportMap;
+boost::shared_mutex ICECallbackAdapter::mLock;
+
+//
+// For some reason the ICE media transport doesn't have the concept of associating user data with a transport, so we
+// have to map it "out of band". The problem is, there is a race condition in that the ICE completion callack could be
+// invoked before we get a chance to add the transport.  The solution to that is to allow an entry to be created when
+// the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
+// simply update the appropriate field.
+// 
+void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const ICETransportPtr& callback)
+{
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    TransportMap::iterator i = mTransportMap.find(transport);
+    if (i != mTransportMap.end())
+    {
+        i->second.transport = callback;
+    }
+    else
+    {
+        CallbackRecord r;
+        r.connected = false;
+        r.transport = callback;
+        mTransportMap.insert(make_pair(transport, r));
+    }
+}
+
+void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_strans_op operation, pj_status_t status)
+{
+    //
+    // AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
+    // 
+    switch (operation)
+    {
+        case PJ_ICE_STRANS_OP_INIT:
+            //
+            // Initialization is complete. FWICT, this should not get here.
+            //
+            break;
+        case PJ_ICE_STRANS_OP_NEGOTIATION:
+            //
+            // Negotiation is complete.
+            //
+        {
+            boost::unique_lock<boost::shared_mutex> lock(mLock);
+            TransportMap::iterator i = mTransportMap.find(transport);
+            if (i == mTransportMap.end())
+            {
+                CallbackRecord r;
+                r.connected = true;
+                mTransportMap.insert(make_pair(transport, r));
+            }
+            else
+            {
+                i->second.connected = true;
+                i->second.transport->onSetupComplete(transport, status);
+            }
+        }
+            break;
+        case PJ_ICE_STRANS_OP_KEEP_ALIVE:
+            //
+            // Keep alive has successfully completed. FWICT this should not get here.
+            // 
+            break;
+    };
+}
+
+}
+
+ICETransport::~ICETransport()
+{
+    //
+    // TODO : cleanup ICE transport, the transport itself is closed by the parent class.
+    // 
+}
+
+void ICETransport::onSetupComplete(pjmedia_transport* transport, int status)
+{
+    if (fail(status))
+    {
+        //
+        // TODO!
+        //
+        return;
+    }
+
+    pjmedia_transport_info info;
+    pjmedia_transport_info_init(&info);
+    pjmedia_transport_get_info(transport, &info);
+
+    pjmedia_ice_transport_info* iceInfo = 0;
+    for (unsigned i = 0; i < info.specific_info_cnt; ++i)
+    {
+        if (info.spc_info[i].type == PJMEDIA_TRANSPORT_TYPE_ICE)
+        {
+            iceInfo = (pjmedia_ice_transport_info*)(info.spc_info[i].buffer);
+        }
+    }
+
+    if (iceInfo != 0 && iceInfo->role == PJ_ICE_SESS_ROLE_CONTROLLING)
+    {
+        if (mLastKnownAddr && pj_sockaddr_cmp(&info.sock_info.rtp_addr_name, mLastKnownAddr.get()))
+        {
+            //
+            // Address has changed! We need to let Session listeners know!
+            // TODO! 
+            //
+            pj_memcpy(mLastKnownAddr.get(), &info.sock_info.rtp_addr_name, sizeof(pj_sockaddr));
+        }
+    }
+}
+
+ICETransportPtr ICETransport::create(const PJMediaEndpointPtr& ep)
+{
+    pjmedia_transport* t;
+    PJICECallbackPtr callback(new pjmedia_ice_cb);
+    callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
+    pj_status_t result = pjmedia_ice_create(ep->endpoint(), "ASCF_ICE_MEDIA", 10, 0, callback.get(),
+      &t);
+    if (fail(result))
+    {
+        throw InternalInitializationException("Unable to create new ICE media transport");
+    }
+    ICETransportPtr transport(new ICETransport(t, callback));
+    ICECallbackAdapter::addEntry(t, transport);
+    return transport;
+}
+
+ICETransport::ICETransport(pjmedia_transport* t, const PJICECallbackPtr& cb) :
+    PJMediaTransport(t),
+    mCallback(cb)
+{ 
+}
diff --git a/src/NATModule.h b/src/ICETransport.h
similarity index 50%
copy from src/NATModule.h
copy to src/ICETransport.h
index 9b4c4bd..5370fec 100644
--- a/src/NATModule.h
+++ b/src/ICETransport.h
@@ -16,41 +16,49 @@
 
 #pragma once
 
-#include "PJMediaEnvironment.h"
+#include "PJMediaTransport.h"
+#include "PJMediaEndpoint.h"
+#include <Ice/PropertiesF.h>
 #include <boost/shared_ptr.hpp>
 
 //
 // Forward declarations.
-//
-struct pj_ice_strans_cfg;
+// 
+struct pjmedia_transport;
+union pj_sockaddr;
+struct pjmedia_ice_cb;
 
 namespace AsteriskSCF
 {
 namespace PJMediaRTP
 {
-class NATModule;
-typedef boost::shared_ptr<NATModule> NATModulePtr;
 
-class NATModule
+class ICETransport;
+typedef boost::shared_ptr<ICETransport> ICETransportPtr;
+typedef boost::shared_ptr<pjmedia_ice_cb> PJICECallbackPtr;
+typedef boost::shared_ptr<pj_sockaddr> PJSockAddrPtr;
+
+class ICETransport : public PJMediaTransport
 {
 public:
-    ~NATModule();
 
-    static NATModulePtr create(const PJMediaEnvironmentPtr& environ);
+    ~ICETransport();
+    void onSetupComplete(pjmedia_transport* transport, int status);
 
-private:
-    boost::shared_ptr<pj_ice_strans_cfg> mTransactionConfig;
+    static ICETransportPtr create(const PJMediaEndpointPtr& ep);
 
-    NATModule(const boost::shared_ptr<pj_ice_strans_cfg>& transactionConfig);
+private:
+    boost::shared_mutex mLock;
+    PJICECallbackPtr mCallback;
+    PJSockAddrPtr mLastKnownAddr;
 
-    void destroyImpl();
-    void startImpl();
+    ICETransport(pjmedia_transport* t, const PJICECallbackPtr& cb);
 
     //
     // Hidden and unimplemented.
     //
-    NATModule(const NATModule&);
-    void operator=(const NATModule&);
+    ICETransport(const ICETransport&);
+    void operator=(const ICETransport&);
 };
 
 } /* End of namespace PJMediaRTP */
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index fe5954c..8f000d1 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -34,7 +34,7 @@
 
 #include "RtpStateReplicationIf.h"
 
-#include "RTPSession.h"
+#include "RTPICESession.h"
 #include "RtpStateReplicator.h"
 
 #include "PJMediaEnvironment.h"
@@ -62,10 +62,15 @@ static const string MediaServiceId("RTPMediaService");
 class RTPMediaServiceImpl : public RTPMediaService
 {
 public:
-    RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
+    RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPrx&,
             const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&);
     RTPSessionPrx allocate(const FormatSeq&, const Ice::Current&);
     pj_pool_factory *getPoolFactory() { return mEnvironment->poolFactory(); };
+
+    PJMediaEnvironmentPtr getEnvironment()
+    {
+        return mEnvironment;
+    }
 private:
     /**
      * A pointer to the object adapter that objects should be added to.
@@ -75,9 +80,9 @@ private:
     PJMediaEnvironmentPtr mEnvironment;
 
     /**
-     * A pointer to the replica service.
+     * A proxy to the local replica control.
      */
-    ReplicaPtr mReplicaService;
+    ReplicaPrx mReplicaControlPrx;
 
     /**
      * A proxy to the state replicator.
@@ -196,6 +201,8 @@ private:
      */
     ReplicaPtr mReplicaService;
 
+    ReplicaPrx mReplicaControlPrx;
+
     /**
      * Instance of our state replicator listener.
      */
@@ -322,11 +329,10 @@ private:
 /**
  * Constructor for the RTPMediaServiceImpl class.
  */
-RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
+RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPrx& replicaService,
         const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
     mAdapter(adapter), mEnvironment(PJMediaEnvironment::create(adapter->getCommunicator()->getProperties(), "")), 
-        mReplicaService(replicaService), mStateReplicator(stateReplicator)
-        
+        mReplicaControlPrx(replicaService), mStateReplicator(stateReplicator)
 {
 }
 
@@ -335,9 +341,8 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, c
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(const FormatSeq& formats, const Ice::Current&)
 {
-    RTPSessionImplPtr session =
-        new RTPSessionImpl(mAdapter, formats, mEnvironment->poolFactory(), mReplicaService, mStateReplicator);
-    return session->getProxy();
+    return AsteriskSCF::PJMediaRTP::RTPSession::create(mAdapter, IceUtil::generateUUID(), formats, mEnvironment,
+      mReplicaControlPrx, mStateReplicator);
 }
 
 /**
@@ -378,7 +383,8 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     mLocalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapterLocal");
 
     mReplicaService = new ReplicaImpl(mLocalAdapter);
-    mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
+    mReplicaControlPrx  = ReplicaPrx::uncheckedCast(mLocalAdapter->add(mReplicaService, 
+        mCommunicator->stringToIdentity(ReplicaServiceId)));
 
     mLocalAdapter->activate();
 
@@ -411,12 +417,12 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     }
 
     RTPMediaServiceImplPtr rtpmediaservice =
-        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator);
+        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaControlPrx, mStateReplicator);
 
     if (mStateReplicator)
     {
         mReplicatorListener =
-            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState);
+            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getEnvironment(), mGeneralState);
         mReplicatorListenerProxy =
             RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
 
diff --git a/src/NATModule.cpp b/src/NATModule.cpp
index d788709..2e5e373 100644
--- a/src/NATModule.cpp
+++ b/src/NATModule.cpp
@@ -24,12 +24,13 @@ using namespace AsteriskSCF::PJMediaRTP;
 using namespace AsteriskSCF::System::V1;
 using namespace AsteriskSCF::PJUtil;
 
-NATModulePtr AsteriskSCF::PJMediaRTP::NATModule::create(const PJMediaEnvironmentPtr& environ)
+NATModulePtr AsteriskSCF::PJMediaRTP::NATModule::create(const PJMediaEnvironmentPtr& environ,
+    const PJMediaEndpointPtr& endpoint)
 {
     boost::shared_ptr<pj_ice_strans_cfg> transcfg(new pj_ice_strans_cfg);
     pj_ice_strans_cfg_default(transcfg.get());
     pj_stun_config_init(&transcfg->stun_cfg, environ->poolFactory(), 0,
-            pjmedia_endpt_get_ioqueue(environ->endpoint()), 0);
+            pjmedia_endpt_get_ioqueue(endpoint->endpoint()), 0);
     pj_status_t result = pj_timer_heap_create(environ->memoryPool(), environ->libConfig()->timerHeapSize(),
             &transcfg->stun_cfg.timer_heap);
     if (fail(result))
diff --git a/src/NATModule.h b/src/NATModule.h
index 9b4c4bd..f23484c 100644
--- a/src/NATModule.h
+++ b/src/NATModule.h
@@ -17,6 +17,7 @@
 #pragma once
 
 #include "PJMediaEnvironment.h"
+#include "PJMediaEndpoint.h"
 #include <boost/shared_ptr.hpp>
 
 //
@@ -36,7 +37,7 @@ class NATModule
 public:
     ~NATModule();
 
-    static NATModulePtr create(const PJMediaEnvironmentPtr& environ);
+    static NATModulePtr create(const PJMediaEnvironmentPtr& environ, const PJMediaEndpointPtr& endpoint);
 
 private:
     boost::shared_ptr<pj_ice_strans_cfg> mTransactionConfig;
diff --git a/src/PJMediaEndpoint.cpp b/src/PJMediaEndpoint.cpp
new file mode 100644
index 0000000..4794bfc
--- /dev/null
+++ b/src/PJMediaEndpoint.cpp
@@ -0,0 +1,45 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "PJMediaEndpoint.h"
+#include "PJUtil.h"
+#include <AsteriskSCF/System/ExceptionsIf.h>
+#include <pjmedia.h>
+
+using namespace AsteriskSCF::PJMediaRTP;
+using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::PJUtil;
+
+PJMediaEndpoint::~PJMediaEndpoint()
+{
+    pjmedia_endpt_destroy(mEndpoint);
+}
+
+PJMediaEndpointPtr AsteriskSCF::PJMediaRTP::PJMediaEndpoint::create(const PJMediaEnvironmentPtr& environ)
+{
+    pjmedia_endpt* t;
+    pj_status_t result =  pjmedia_endpt_create(environ->poolFactory(), 0, 1, &t);
+    if (fail(result))
+    {
+        throw InternalInitializationException("Unable to create media endpoint!");
+    }
+    return PJMediaEndpointPtr(new PJMediaEndpoint(t));
+}
+
+PJMediaEndpoint::PJMediaEndpoint(pjmedia_endpt* endpoint) :
+    mEndpoint(endpoint)
+{
+}
diff --git a/src/NATModule.h b/src/PJMediaEndpoint.h
similarity index 62%
copy from src/NATModule.h
copy to src/PJMediaEndpoint.h
index 9b4c4bd..86ce0e4 100644
--- a/src/NATModule.h
+++ b/src/PJMediaEndpoint.h
@@ -20,38 +20,41 @@
 #include <boost/shared_ptr.hpp>
 
 //
-// Forward declarations.
+// forward declarations.
 //
-struct pj_ice_strans_cfg;
+struct pjmedia_endpt;
 
 namespace AsteriskSCF
 {
 namespace PJMediaRTP
 {
-class NATModule;
-typedef boost::shared_ptr<NATModule> NATModulePtr;
 
-class NATModule
+class PJMediaEndpoint;
+typedef boost::shared_ptr<PJMediaEndpoint> PJMediaEndpointPtr;
+
+class PJMediaEndpoint
 {
 public:
-    ~NATModule();
 
-    static NATModulePtr create(const PJMediaEnvironmentPtr& environ);
+    ~PJMediaEndpoint();
 
-private:
-    boost::shared_ptr<pj_ice_strans_cfg> mTransactionConfig;
+    pjmedia_endpt* endpoint() const
+    {
+        return mEndpoint;
+    }
 
-    NATModule(const boost::shared_ptr<pj_ice_strans_cfg>& transactionConfig);
+    static PJMediaEndpointPtr create(const PJMediaEnvironmentPtr& environ);
 
-    void destroyImpl();
-    void startImpl();
+private:
+    pjmedia_endpt* mEndpoint;
+
+    PJMediaEndpoint(pjmedia_endpt* endpoint);
 
     //
     // Hidden and unimplemented.
     //
-    NATModule(const NATModule&);
-    void operator=(const NATModule&);
+    PJMediaEndpoint(const PJMediaEndpoint&);
+    void operator=(const PJMediaEndpoint&);
 };
-
 } /* End of namespace PJMediaRTP */
 } /* End of namespace AsteriskSCF */
diff --git a/src/PJMediaEnvironment.cpp b/src/PJMediaEnvironment.cpp
index 12975d9..2596180 100644
--- a/src/PJMediaEnvironment.cpp
+++ b/src/PJMediaEnvironment.cpp
@@ -32,15 +32,18 @@ PJMediaEnvironmentPtr AsteriskSCF::PJMediaRTP::PJMediaEnvironment::create(const
     return PJMediaEnvironmentPtr(
         new PJMediaEnvironment(PJLibConfiguration::create(props, propertyPrefix),
                 ICEConfiguration::create(props, propertyPrefix),
-                NATConfig::create(props, propertyPrefix)));
+                NATConfig::create(props, propertyPrefix),
+                SRTPConfiguration::create(props, propertyPrefix)));
 }
 
 PJMediaEnvironment::PJMediaEnvironment(const PJLibConfigurationPtr& libConfig,
         const ICEConfigurationPtr& iceConfig,
-        const NATConfigPtr& natConfig) :
+        const NATConfigPtr& natConfig,
+        const SRTPConfigurationPtr& srtpConfig) :
     mPJLibConfig(libConfig),
     mICEConfig(iceConfig),
     mNATConfig(natConfig),
+    mSRTPConfig(srtpConfig),
     mCachingPool(new pj_caching_pool)
 {
     pj_caching_pool_init(mCachingPool.get(), & pj_pool_factory_default_policy, 0);
@@ -50,9 +53,4 @@ PJMediaEnvironment::PJMediaEnvironment(const PJLibConfigurationPtr& libConfig,
     // TODO: should these values come from configuration.
     //
     mMemoryPool = pj_pool_create(mPoolFactory, "media_rtp_pjmedia", 1024, 1024, 0);
-    pj_status_t result = pjmedia_endpt_create(mPoolFactory, 0, 1, &mEndpoint);
-    if (fail(result))
-    {
-        throw InternalInitializationException("Unable to create PJMEDIA endpoint.");
-    }
 }
diff --git a/src/PJMediaEnvironment.h b/src/PJMediaEnvironment.h
index d1dc494..2dea117 100644
--- a/src/PJMediaEnvironment.h
+++ b/src/PJMediaEnvironment.h
@@ -19,6 +19,7 @@
 #include "PJLibConfiguration.h"
 #include "NATConfig.h"
 #include "ICEConfiguration.h"
+#include "SRTPConfiguration.h"
 
 #include <Ice/PropertiesF.h>
 #include <string>
@@ -59,14 +60,14 @@ public:
         return mICEConfig;
     }
 
-    pj_pool_factory* poolFactory() const
+    SRTPConfigurationPtr srtpConfig() const
     {
-        return mPoolFactory;
+        return mSRTPConfig;
     }
 
-    pjmedia_endpt* endpoint() const
+    pj_pool_factory* poolFactory() const
     {
-        return mEndpoint;
+        return mPoolFactory;
     }
 
     pj_pool_t* memoryPool() const
@@ -74,6 +75,7 @@ public:
         return mMemoryPool;
     }
 
+
     static PJMediaEnvironmentPtr create(const Ice::PropertiesPtr& props, const std::string& prefix);
 
 private:
@@ -81,6 +83,7 @@ private:
     PJLibConfigurationPtr mPJLibConfig;
     ICEConfigurationPtr mICEConfig;
     NATConfigPtr mNATConfig;
+    SRTPConfigurationPtr mSRTPConfig;
 
     pj_pool_factory* mPoolFactory;
     pjmedia_endpt* mEndpoint;
@@ -89,7 +92,8 @@ private:
 
     PJMediaEnvironment(const PJLibConfigurationPtr& libConfig,
             const ICEConfigurationPtr& iceConfig,
-            const NATConfigPtr& natconfig);
+            const NATConfigPtr& natconfig,
+            const SRTPConfigurationPtr& srtpConfig);
 };
 
 } /* End of namespace PJMediaRTP */
diff --git a/src/PJMediaTransport.cpp b/src/PJMediaTransport.cpp
new file mode 100644
index 0000000..803c4c3
--- /dev/null
+++ b/src/PJMediaTransport.cpp
@@ -0,0 +1,61 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include "PJMediaTransport.h"
+#include <pjmedia.h>
+#include <pjlib.h>
+#include <string>
+
+using namespace std;
+using namespace AsteriskSCF::PJMediaRTP;
+using namespace AsteriskSCF::Helpers;
+
+PJMediaTransport::~PJMediaTransport()
+{
+    pjmedia_transport_close(mTransport);
+}
+
+pjmedia_transport* PJMediaTransport::getTransport() const
+{
+    return mTransport;
+}
+
+PJMediaTransport::PJMediaTransport(pjmedia_transport* t) :
+    mTransport(t)
+{
+}
+
+AddressPtr PJMediaTransport::localAddress() const
+{
+    pjmedia_transport_info info;
+    pjmedia_transport_info_init(&info);
+    pjmedia_transport_get_info(mTransport, &info);
+
+    string address = pj_inet_ntoa(info.sock_info.rtp_addr_name.ipv4.sin_addr);
+    int port = pj_ntohs(info.sock_info.rtp_addr_name.ipv4.sin_port);
+    return AddressPtr(new Address(address, port));
+}
+
+AddressPtr PJMediaTransport::remoteAddress() const
+{
+    pjmedia_transport_info info;
+    pjmedia_transport_info_init(&info);
+    pjmedia_transport_get_info(mTransport, &info);
+
+    string address = pj_inet_ntoa(info.src_rtp_name.ipv4.sin_addr);
+    int port = pj_ntohs(info.src_rtp_name.ipv4.sin_port);
+    return AddressPtr(new Address(address, port));
+}
diff --git a/src/NATModule.h b/src/PJMediaTransport.h
similarity index 58%
copy from src/NATModule.h
copy to src/PJMediaTransport.h
index 9b4c4bd..fea7ae7 100644
--- a/src/NATModule.h
+++ b/src/PJMediaTransport.h
@@ -16,41 +16,43 @@
 
 #pragma once
 
-#include "PJMediaEnvironment.h"
+#include <AsteriskSCF/Helpers/Network.h>
 #include <boost/shared_ptr.hpp>
 
 //
 // Forward declarations.
 //
-struct pj_ice_strans_cfg;
+struct pjmedia_transport;
 
 namespace AsteriskSCF
 {
 namespace PJMediaRTP
 {
-class NATModule;
-typedef boost::shared_ptr<NATModule> NATModulePtr;
 
-class NATModule
+class PJMediaTransport;
+typedef boost::shared_ptr<PJMediaTransport> PJMediaTransportPtr;
+
+class PJMediaTransport
 {
 public:
-    ~NATModule();
 
-    static NATModulePtr create(const PJMediaEnvironmentPtr& environ);
+    virtual ~PJMediaTransport();
+
+    pjmedia_transport* getTransport() const;
 
-private:
-    boost::shared_ptr<pj_ice_strans_cfg> mTransactionConfig;
+    virtual AsteriskSCF::Helpers::AddressPtr localAddress() const;
+    virtual AsteriskSCF::Helpers::AddressPtr remoteAddress() const;
 
-    NATModule(const boost::shared_ptr<pj_ice_strans_cfg>& transactionConfig);
+protected:
+    pjmedia_transport* mTransport;
 
-    void destroyImpl();
-    void startImpl();
+    PJMediaTransport(pjmedia_transport* t);
 
     //
     // Hidden and unimplemented.
     //
-    NATModule(const NATModule&);
-    void operator=(const NATModule&);
+    PJMediaTransport(const PJMediaTransport&);
+    void operator=(const PJMediaTransport&);
 };
 
 } /* End of namespace PJMediaRTP */
diff --git a/src/RTPICESession.cpp b/src/RTPICESession.cpp
index fc3035b..48d064e 100644
--- a/src/RTPICESession.cpp
+++ b/src/RTPICESession.cpp
@@ -21,8 +21,15 @@
 
 #include "RTPSource.h"
 #include "RTPSink.h"
+#include "PJMediaEndpoint.h"
+#include "PJMediaTransport.h"
 #include "ServiceConfig.h"
 #include "NATConfig.h"
+#include "SessionAdapter.h"
+
+#include "UDPTransport.h"
+#include "ICETransport.h"
+#include "SRTPTransport.h"
 
 #include <pjlib.h>
 #include <pjmedia.h>
@@ -33,9 +40,12 @@
 #include <AsteriskSCF/System/ExceptionsIf.h>
 #include <AsteriskSCF/logger.h>
 #include <AsteriskSCF/Helpers/Network.h>
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
+#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 
 using namespace std;
 using namespace AsteriskSCF::System::V1;
+using namespace AsteriskSCF::System::Component::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::Helpers;
@@ -48,15 +58,12 @@ namespace
 Logger logger = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 }
 
-//
-// Ideally this would be an anonymous namespace, but some compilers seem to have an issue with them
-//
-namespace ICEMediaSession
+namespace 
 {
 
 typedef AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> StateReplicatorPrx;
 
-class RTPSessionImpl : public RTPSession
+class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
 {
 public:
 
@@ -65,12 +72,21 @@ public:
     // amongst constructor versions and avoid exception prone calls in
     // the constructor itself.
     //
-    RTPSessionImpl(const FormatSeq& formats, const PJMediaEnvironmentPtr& pjLib,
-            const StateReplicatorPrx& replicator);
+    RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats, 
+      const PJMediaEnvironmentPtr& environment,
+      const StateReplicatorPrx& replicator, const string& id, const Ice::PropertiesPtr& properties,
+      const ReplicaPrx& replicaControl);
 
-    void initialize();
+    /**
+     * Instantiate the object by way of normal session creation. 
+     **/
+    RTPSessionPrx activate(const string& id);
 
-    void destroy();
+    /**
+     * Instantiate the object by way of replication.
+     **/
+    RTPSessionPrx activate(const Ice::Identity& id, const Ice::Identity& sourceId,
+      const Ice::Identity& sinkId);
 
     //
     // Slice to C++ mapping.
@@ -91,361 +107,415 @@ public:
     RTCPSessionPrx getRTCPSession(const Ice::Current& current);
     void release(const Ice::Current& current);
 
-    void onComplete(pj_ice_strans_op operationStatus, pj_status_t status);
+    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
+
+    void removeState(const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr&,
+            const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr&);
+
+    void setRemoteDetails(const std::string& host, int port);
+
+    FormatSeq getFormats()
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        return mFormats;
+    }
+
+    FormatPtr getFormat(int payload)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        PayloadMap::iterator it = mSessionStateItem->mPayloadstoFormats.find(payload);
+        if (it == mSessionStateItem->mPayloadstoFormats.end())
+        {
+            return 0;
+        }
+        return it->second;
+    }
+
+    int getPayload(const FormatPtr& format)
+    {
+        boost::shared_lock<boost::shared_mutex> lock(mLock);
+        map<string, int>::iterator it = mFormatsToPayloads.find(format->name);
+        if (it == mFormatsToPayloads.end())
+        {
+            return -1;
+        }
+        return it->second;
+    }
+
+    void destroy();
+    void associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings);
+
+    StreamSourceRTPImplPtr getSourceServant()
+    {
+        return mSource.servant;
+    }
+
+    StreamSinkRTPImplPtr getSinkServant()
+    {
+        return mSink.servant;
+    }
 
-    //
-    // Internal methods.
-    //
 private:
     boost::shared_mutex mLock;
+    Ice::ObjectAdapterPtr mAdapter;
     FormatSeq mFormats;
     PJMediaEnvironmentPtr mMediaEnvironment;
+    PJMediaEndpointPtr mEndpoint;
     StateReplicatorPrx mReplicator;
+    ReplicaPrx mReplicaControl;
+    PJMediaTransportPtr mTransport;
+    string mId;
+    RtpSessionStateItemPtr mSessionStateItem;
+    map<string, int> mFormatsToPayloads;
+
+    struct SourceRecord
+    {
+        StreamSourceRTPImplPtr servant;
+        StreamSourcePrx proxy;
+    };
+    SourceRecord mSource;
+
+    struct SinkRecord
+    {
+        StreamSinkRTPImplPtr servant;
+        StreamSinkPrx proxy;
+    };
+    SinkRecord mSink;
 };
 
 typedef IceUtil::Handle<RTPSessionImpl> RTPSessionImplPtr;
 
-class ICECallbackAdapter
+class SessionAdapterImpl : public SessionAdapter
 {
-
 public:
-    static void addEntry(pjmedia_transport* transport, const RTPSessionImplPtr& callback);
-    static void onICEComplete(pjmedia_transport* transport, pj_ice_strans_op operation, pj_status_t status);
 
-    struct CallbackRecord
+    SessionAdapterImpl(const RTPSessionImplPtr& servant) :
+        mServant(servant)
     {
-        bool connected;
-        RTPSessionImplPtr session;
-    };
-    typedef std::map<pjmedia_transport*, CallbackRecord> TransportMap;
-private:
+    }
+
+    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr& sinkStateItem) 
+    {
+        mServant->replicateState(0, sinkStateItem, 0);
+    }
 
-    static TransportMap mTransportMap;
-    static boost::shared_mutex mLock;
+    void replicateState(const AsteriskSCF::Media::RTP::V1::RtpStreamSourceStateItemPtr& sourceStateItem)
+    {
+        mServant->replicateState(0, 0, sourceStateItem);
+    }
+
+    AsteriskSCF::Media::V1::FormatPtr getFormat(int payload) 
+    {
+        return mServant->getFormat(payload);
+    }
+
+    int getPayload(const AsteriskSCF::Media::V1::FormatPtr& format) 
+    {
+        return mServant->getPayload(format);
+    }
+
+    AsteriskSCF::Media::V1::FormatSeq getFormats()
+    {
+        return mServant->getFormats();
+    }
+
+    void setRemoteDetails(const std::string& host, int port)
+    {
+        mServant->setRemoteDetails(host, port);
+    }
+
+private:
+    RTPSessionImplPtr mServant;
 };
 
-class SinkImpl : public StreamSinkRTP
+RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats, 
+  const PJMediaEnvironmentPtr& environment,
+  const StateReplicatorPrx& replicator, const string& id, const Ice::PropertiesPtr& properties, 
+  const ReplicaPrx& replicaControl) :
+    mAdapter(adapter),
+    mFormats(formats),
+    mMediaEnvironment(environment),
+    mReplicator(replicator),
+    mReplicaControl(replicaControl),
+    mEndpoint(PJMediaEndpoint::create(environment)),
+    mId(id),
+    mSessionStateItem(new RtpSessionStateItem)
 {
-public:
+    if (environment->natConfig() && environment->natConfig()->isSTUNEnabled())
+    {
+        mTransport = ICETransport::create(mEndpoint);
+    }
+    else
+    {
+        mTransport = UDPTransport::create(mEndpoint, properties);
+    }
 
-    SinkImpl();
+    if (environment->srtpConfig() && environment->srtpConfig()->isSRTPEnabled())
+    {
+        mTransport = SRTPTransport::create(mTransport, mEndpoint, properties);
+    }
+    SessionAdapterPtr sessionAdapter(new SessionAdapterImpl(this));
+    mSource.servant = new StreamSourceRTPImpl(sessionAdapter, mTransport, mId);
+    mSink.servant = new StreamSinkRTPImpl(sessionAdapter, mTransport, mId); 
+}
 
-    //
-    // Slice to C++ mapping.
-    //
+RTPSessionPrx RTPSessionImpl::activate(const string& id)
+{
+    Ice::Identity sourceId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+    Ice::Identity sinkId(mAdapter->getCommunicator()->stringToIdentity(IceUtil::generateUUID()));
+    return activate(mAdapter->getCommunicator()->stringToIdentity(id), sourceId, sinkId);
+}
 
-    //
-    // StreamSink
-    //
-    void write(const FrameSeq& frames, const Ice::Current& current);
-    void setSource(const StreamSourcePrx& source, const Ice::Current& current);
-    StreamSourcePrx getSource(const Ice::Current& current);
-    FormatSeq getFormats(const Ice::Current& current);
-    std::string getId(const Ice::Current& current);
+RTPSessionPrx RTPSessionImpl::activate(const Ice::Identity& id,
+        const Ice::Identity& sourceId, const Ice::Identity& sinkId)
+{
+    mSource.proxy = StreamSourcePrx::uncheckedCast(mAdapter->add(this, sourceId));
+    mSink.proxy = StreamSinkPrx::uncheckedCast(mAdapter->add(this, sourceId));
 
-    //
-    // StreamSinkRTP
-    //
-    void setRemoteDetails(const std::string& address, int port, const Ice::Current& current);
-    std::string getRemoteAddress(const Ice::Current& current);
-    int getRemotePort(const Ice::Current& current);
+    mSessionStateItem->key = mSessionStateItem->mSessionId = mId;
+    mSessionStateItem->mSessionIdentity = id;
+    mSessionStateItem->mFormats = mFormats;
+    mSessionStateItem->mSourceIdentity = sourceId;
+    mSessionStateItem->mSinkIdentity = sinkId;
+    replicateState(mSessionStateItem, mSink.servant->getStateItem(), mSource.servant->getStateItem());
 
-    //
-    // Internal methods.
-    //
-    void destroy();
-};
+    return RTPSessionPrx::uncheckedCast(mAdapter->add(this, id));
+}
 
-class SourceImpl : public StreamSourceRTP
+StreamSourceSeq RTPSessionImpl::getSources(const Ice::Current&)
 {
-public:
-    SourceImpl();
+    StreamSourceSeq result;
+    result.push_back(mSource.proxy);
+    return result;
+}
 
-    //
-    // Slice to C++ mapping.
-    //
+StreamSinkSeq RTPSessionImpl::getSinks(const Ice::Current&)
+{
+    StreamSinkSeq result;
+    result.push_back(mSink.proxy);
+    return result;
+}
 
-    //
-    // StreamSource
-    //
-    void setSink(const StreamSinkPrx& destination, const Ice::Current& current);
-    StreamSinkPrx getSink(const Ice::Current& current);
-    FormatSeq getFormats(const Ice::Current& current);
-    std::string getId( const Ice::Current& current);
-    void requestFormat(const FormatPtr& newFormat, const Ice::Current& current);
+std::string RTPSessionImpl::getId(const Ice::Current&)
+{
+    return mId;
+}
 
+void RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings,
+    const Ice::Current&)
+{
+    associatePayloadsImpl(mappings);
+}
+
+void RTPSessionImpl::useRTCP(bool, const Ice::Current&)
+{
     //
-    // Internal methods.
+    // No-op for the moment.
     //
-    void destroy();
-};
+}
 
-class PJMediaTransport
+RTCPSessionPrx RTPSessionImpl::getRTCPSession(const Ice::Current&)
 {
-public:
-    virtual ~PJMediaTransport() {}
+    return RTCPSessionPrx();
+}
+
+void RTPSessionImpl::release(const Ice::Current& current)
+{
+    destroy();
+}
 
-    pjmedia_transport* getTransport() const
+void RTPSessionImpl::replicateState(const RtpSessionStateItemPtr& session, 
+  const RtpStreamSinkStateItemPtr& sink,
+  const RtpStreamSourceStateItemPtr& source)
+{
+    // If state replication has been disabled do nothing
+    if (!mReplicator || mReplicaControl->isActive() == false)
     {
-        return mTransport;
+	return;
     }
 
-protected:
-
-    pjmedia_transport* mTransport;
+    RtpStateItemSeq items;
 
-    PJMediaTransport(pjmedia_transport* t) :
-         mTransport(t)
+    if (session)
     {
+	items.push_back(session);
     }
-};
 
-typedef boost::shared_ptr<PJMediaTransport> PJMediaTransportPtr;
+    if (sink)
+    {
+	items.push_back(sink);
+    }
 
-//
-// ICE transport implementation.
-//
-class ICETransport;
-typedef boost::shared_ptr<ICETransport> ICETransportPtr;
-typedef boost::shared_ptr<pjmedia_ice_cb> PJICECallbackPtr;
+    if (source)
+    {
+	items.push_back(source);
+    }
 
-class ICETransport : public PJMediaTransport
-{
-    //
-    // Hidden and unimplemented.
-    //
-    ICETransport(const ICETransport&);
-    void operator=(const ICETransport&);
-    
-public:
-    static ICETransportPtr create(const PJMediaEnvironmentPtr& environment)
-    {
-        pjmedia_transport* t;
-        PJICECallbackPtr callback(new pjmedia_ice_cb);
-        callback->on_ice_complete = &ICECallbackAdapter::onICEComplete;
-        pj_status_t result = pjmedia_ice_create(environment->endpoint(), "ASCF_ICE_MEDIA", 10, 0, callback.get(),
-                &t);
-        if (fail(result))
-        {
-            throw InternalInitializationException("Unable to create new ICE media transport");
-        }
+    if (items.size() == 0)
+    {
+	return;
     }
 
-    ICETransport(pjmedia_transport* t, const PJICECallbackPtr& cb) :
-        PJMediaTransport(t),
-        mCallback(cb)
+    try
     {
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mReplicator->ice_oneway());
+	oneway->setState(items);
     }
+    catch (...)
+    {
+    }
+}
 
-private:
-    PJICECallbackPtr mCallback;
-};
+void RTPSessionImpl::removeState(const RtpSessionStateItemPtr& session, const RtpStreamSinkStateItemPtr& sink,
+        const RtpStreamSourceStateItemPtr& source)
+{
+    // If state replication has been disabled do nothing
+    if (!mReplicator || mReplicaControl->isActive() == false)
+    {
+        return;
+    }
 
-//
-// UDP transport implementation.
-//
-class UDPTransport;
-typedef boost::shared_ptr<UDPTransport> UDPTransportPtr;
+    Ice::StringSeq items;
 
-class UDPTransport : public PJMediaTransport
-{
-    //
-    // Hidden and unimplemented.
-    //
-    UDPTransport(const UDPTransport&);
-    void operator=(const UDPTransport&);
-    
-public:
-    //
-    // Replicant version.
-    //
-    static UDPTransportPtr create(const PJMediaEnvironmentPtr& environment, Ice::Int port)
+    if (session)
     {
-        pjmedia_transport* transport;
-        pj_status_t result = pjmedia_transport_udp_create2(environment->endpoint(), "RTP", 0, port, 0, &transport);
-        if (fail(result))
-        {
-            throw InternalInitializationException("Unable to initialize UDP media transport on "
-                    "replicant, port conflict!");
-        }
-        return UDPTransportPtr(new UDPTransport(transport));
+	items.push_back(session->key);
     }
-    
-    //
-    // Primary version.
-    //
-    static UDPTransportPtr create(const PJMediaEnvironmentPtr& pjlib, const Ice::PropertiesPtr& properties)
-    {
-        const Ice::Int minPort =
-            properties->getPropertyAsIntWithDefault("AsteriskSCF.Media.Transport.UDPTransport.MinPort",
-                DefaultRTPPortMinimum);
-        const Ice::Int maxPort =
-            properties->getPropertyAsIntWithDefault("AsteriskSCF.Media.Transport.UDPTransport.MaxPort",
-                DefaultRTPPortMaximum);
-
-        //
-        // Now create some transport we can use to actually send or receive the media. The candidate ports increment by
-        // two to allow "room" for both a control and a data port.
-        //
-        for (Ice::Int port = minPort; port < maxPort && port <= USHRT_MAX ; port +=2)
-        {
-            pjmedia_transport* transport;
-            pj_status_t result = pjmedia_transport_udp_create2(pjlib->endpoint(), "RTP", 0, port, 0, &transport);
-            if (success(result))
-            {
-                return UDPTransportPtr(new UDPTransport(transport));
-            }
-        }
-        throw InternalInitializationException("Unable to initialize UDP media transport");
+
+    if (sink)
+    {
+	items.push_back(sink->key);
     }
 
-private:
-    UDPTransport(pjmedia_transport* t) :
-        PJMediaTransport(t)
+    if (source)
     {
+	items.push_back(source->key);
     }
-};
 
+    if (items.size() == 0)
+    {
+	return;
+    }
 
-//
-// For some reason the ICE media transport doesn't have the concept of associating user data with a transport, so we
-// have to map it "out of band". The problem is, there is a race condition in that the ICE completion callack could be
-// invoked before we get a chance to add the transport.  The solution to that is to allow an entry to be created when
-// the ICE completion callback arrives and there isn't a table entry. When the addEntry runs, it will see the entry and
-// simply update the appropriate field.
-// 
-void ICECallbackAdapter::addEntry(pjmedia_transport* transport, const RTPSessionImplPtr& callback)
-{
-    boost::unique_lock<boost::shared_mutex> lock(mLock);
-    TransportMap::iterator i = mTransportMap.find(transport);
-    if (i != mTransportMap.end())
+    try
     {
-        i->second.session = callback;
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mReplicator->ice_oneway());
+	oneway->removeState(items);
     }
-    else
+    catch (...)
     {
-        CallbackRecord r;
-        r.connected = false;
-        r.session = callback;
-        mTransportMap.insert(make_pair(transport, r));
     }
+}
 
+void RTPSessionImpl::setRemoteDetails(const std::string& host, int port)
+{
+    mSource.servant->setRemoteDetails(host, port);
 }
 
-void ICECallbackAdapter::onICEComplete(pjmedia_transport* transport, pj_ice_strans_op operation, pj_status_t status)
+void RTPSessionImpl::destroy()
 {
-    //
-    // AFAICT, only PJ_ICE_STRANS_OP_NEGOTIATION should get here.
-    // 
-    switch (operation)
-    {
-        //
-        // TODO: Fill out.
-        //
-        case PJ_ICE_STRANS_OP_INIT:
-            //
-            // Initialization is complete. FWICT, this should not get here.
-            //
-            break;
-        case PJ_ICE_STRANS_OP_NEGOTIATION:
-            //
-            // Negotiation is complete.
-            //
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
+    if (mSource.proxy)
+    {
+        try
         {
-            boost::unique_lock<boost::shared_mutex> lock(mLock);
-            TransportMap::iterator i = mTransportMap.find(transport);
-            if (i == mTransportMap.end())
-            {
-                CallbackRecord r;
-                r.connected = true;
-                mTransportMap.insert(make_pair(transport, r));
-            }
-            else
-            {
-                i->second.connected = true;
-                i->second.session->onComplete(operation, status);
-            }
+            mAdapter->remove(mSource.proxy->ice_getIdentity());
         }
-            break;
-        case PJ_ICE_STRANS_OP_KEEP_ALIVE:
-            //
-            // Keep alive has successfully completed. FWICT this should not get here.
-            // 
-            break;
-    };
-}
-
-}
-
-//
-// Static member initializations.
-//
-ICEMediaSession::ICECallbackAdapter::TransportMap ICEMediaSession::ICECallbackAdapter::mTransportMap;
-boost::shared_mutex ICEMediaSession::ICECallbackAdapter::mLock;
+        catch (...)
+        {
+        }
+    }
 
-using namespace ICEMediaSession;
+    if (mSink.proxy)
+    {
+        try
+        {
+            mAdapter->remove(mSink.proxy->ice_getIdentity());
+        }
+        catch (...)
+        {
+        }
+    }
 
-ICEMediaSession::RTPSessionImpl::RTPSessionImpl(const FormatSeq& formats, const PJMediaEnvironmentPtr& environment,
-    const StateReplicatorPrx& replicator) :
-    mFormats(formats),
-    mMediaEnvironment(environment),
-    mReplicator(replicator)
-{
+    mSource = SourceRecord();
+    mSink = SinkRecord();
+    try
+    {
+        mAdapter->remove(mAdapter->getCommunicator()->stringToIdentity(mId));
+    }
+    catch (...)
+    {
+    }
 }
 
-void ICEMediaSession::RTPSessionImpl::initialize()
+void RTPSessionImpl::associatePayloadsImpl(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings)
 {
+    boost::unique_lock<boost::shared_mutex> lock(mLock);
 
-}
+    mSessionStateItem->mPayloadstoFormats = mappings;
+    for (PayloadMap::const_iterator it = mappings.begin(); it != mappings.end(); ++it)
+    {
+        mFormatsToPayloads.insert(make_pair((*it).second->name, (*it).first));
+    }
 
-void ICEMediaSession::RTPSessionImpl::destroy()
-{
+    // Only the session has changed so push a single update out for it
+    replicateState(mSessionStateItem, 0, 0);
 }
 
-StreamSourceSeq ICEMediaSession::RTPSessionImpl::getSources(const Ice::Current&)
-{
-    return StreamSourceSeq();
 }
 
-StreamSinkSeq ICEMediaSession::RTPSessionImpl::getSinks(const Ice::Current&)
+RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+        const std::string& id, const FormatSeq& formats, const PJMediaEnvironmentPtr& environment, 
+        const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
+        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator)
 {
-    return StreamSinkSeq();
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, formats, environment,
+                    stateReplicator, id, adapter->getCommunicator()->getProperties(), replicaControl));
+    return servant->activate(id);
 }
 
-std::string ICEMediaSession::RTPSessionImpl::getId(const Ice::Current&)
+class ReplicationAdapterImpl : public ReplicationAdapter
 {
-    return "";
-}
+public:
 
-void ICEMediaSession::RTPSessionImpl::associatePayloads(const AsteriskSCF::Media::RTP::V1::PayloadMap& mappings,
-    const Ice::Current&)
-{
-}
+    void update(const RtpStreamSinkStateItemPtr& item)
+    {
+        mImpl->getSinkServant()->setSourceImpl(item->mSource);
+        mImpl->getSinkServant()->setRemoteDetailsImpl(item->mRemoteAddress, item->mRemotePort);
+        mImpl->getSourceServant()->setRemoteDetails(item->mRemoteAddress, item->mRemotePort);
+    }
 
-void ICEMediaSession::RTPSessionImpl::useRTCP(bool enable, const Ice::Current&)
-{
-}
+    void update(const RtpStreamSourceStateItemPtr& item)
+    {
+        mImpl->getSourceServant()->setSinkImpl(item->mSink);
+    }
 
-RTCPSessionPrx ICEMediaSession::RTPSessionImpl::getRTCPSession(const Ice::Current&)
-{
-    return 0;
-}
+    void destroy()
+    {
+        mImpl->destroy();
+    }
 
-void ICEMediaSession::RTPSessionImpl::release(const Ice::Current&)
-{
-}
+    ReplicationAdapterImpl(const RTPSessionImplPtr& impl) :
+        mImpl(impl)
+    {
+    }
+private:
 
-void ICEMediaSession::RTPSessionImpl::onComplete(pj_ice_strans_op operationStatus, pj_status_t status)
-{
-}
+    RTPSessionImplPtr mImpl;
+};
 
-RTPSessionPrx AsteriskSCF::PJMediaRTP::RTPICESession::create(const Ice::ObjectAdapterPtr& adapter,
-        const Ice::Identity& id,
-        const FormatSeq& formats, const PJMediaEnvironmentPtr& environment, 
-        const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator)
+ReplicationAdapterPtr AsteriskSCF::PJMediaRTP::RTPSession::create(const Ice::ObjectAdapterPtr& adapter,
+  const PJMediaEnvironmentPtr& environment, 
+  const RtpSessionStateItemPtr& item)
 {
-    ICEMediaSession::RTPSessionImplPtr servant(new ICEMediaSession::RTPSessionImpl(formats, environment,
-                    stateReplicator));
-    assert(adapter);
-    return RTPSessionPrx::uncheckedCast(adapter->add(servant, id));
+    RTPSessionImplPtr servant(new RTPSessionImpl(adapter, item->mFormats, environment,
+                    AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>(), 
+                    adapter->getCommunicator()->identityToString(item->mSessionIdentity), 
+                    adapter->getCommunicator()->getProperties(), 0));
+    servant->activate(item->mSessionIdentity, item-> mSourceIdentity, item->mSinkIdentity);
+    servant->associatePayloadsImpl(item->mPayloadstoFormats);
+    return ReplicationAdapterPtr(new ReplicationAdapterImpl(servant));
 }
diff --git a/src/RTPICESession.h b/src/RTPICESession.h
index bf6e583..46e2b42 100644
--- a/src/RTPICESession.h
+++ b/src/RTPICESession.h
@@ -16,12 +16,17 @@
 
 #pragma once
 
+#include "RtpStateReplicationIf.h"
+#include "PJMediaEnvironment.h"
+
+#include <Ice/ObjectAdapterF.h>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
 #include <AsteriskSCF/System/Component/ReplicaIf.h>
 #include <AsteriskSCF/Discovery/SmartProxy.h>
-#include "RtpStateReplicationIf.h"
-#include "PJMediaEnvironment.h"
+#include <AsteriskSCF/System/Component/ReplicaIf.h>
+
+#include "ReplicationAdapter.h"
 
 //
 // Forward declarations.
@@ -32,17 +37,22 @@ namespace AsteriskSCF
 {
 namespace PJMediaRTP
 {
-
-class RTPICESession
+class RTPSession
 {
 public:
     static AsteriskSCF::Media::RTP::V1::RTPSessionPrx create(const Ice::ObjectAdapterPtr& objectAdapter,
-            const Ice::Identity& id,
+            const std::string& id,
             const AsteriskSCF::Media::V1::FormatSeq& formats,
             const PJMediaEnvironmentPtr& environment,
+            const AsteriskSCF::System::Component::V1::ReplicaPrx& replicaControl,
             const AsteriskSCF::Discovery::SmartProxy<
                 AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>& stateReplicator
     );
+
+    static ReplicationAdapterPtr create(const Ice::ObjectAdapterPtr& objectAdapter,
+        const PJMediaEnvironmentPtr& environment,
+        const AsteriskSCF::Media::RTP::V1::RtpSessionStateItemPtr& update
+    );
 };
 } /* End of namespace RTPMedia */
 } /* End of namespace AsteriskSCF */
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 1903ff5..69fea7c 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -17,10 +17,6 @@
 #include "RTPSink.h"
 
 #include <IceUtil/UUID.h>
-
-
-#include <AsteriskSCF/Discovery/SmartProxy.h>
-
 #include <pjlib.h>
 #include <pjmedia.h>
 
@@ -28,6 +24,7 @@ using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::PJMediaRTP;
 
 /**
  * Private implementation details for the StreamSinkRTPImpl class.
@@ -38,7 +35,16 @@ public:
     /**
      * Constructor for our StreamSinkRTPImplPriv class.
      */
-    StreamSinkRTPImplPriv(const RTPSessionImplPtr&, const std::string&);
+    StreamSinkRTPImplPriv(const SessionAdapterPtr& sessionAdapter, const PJMediaTransportPtr& transport, 
+      const std::string& id):
+        mSession(sessionAdapter), mTransport(transport), 
+        mSinkStateItem(new RtpStreamSinkStateItem)
+    {
+        pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
+        mSinkStateItem->mSessionId = id;
+        mSinkStateItem->key = IceUtil::generateUUID();
+        mSinkStateItem->mRemotePort = 0;
+    }
 
     /**
      * A structure containing outgoing pjmedia session data.
@@ -48,7 +54,9 @@ public:
     /**
      * A pointer to the RTP session we are associated with.
      */
-    RTPSessionImplPtr mSession;
+    SessionAdapterPtr mSession;
+
+    PJMediaTransportPtr mTransport;
 
     /**
      * Stream sink state item.
@@ -56,23 +64,13 @@ public:
     RtpStreamSinkStateItemPtr mSinkStateItem;
 };
 
-/**
- * Constructor for the StreamSinkRTPImplPriv class.
- */
-StreamSinkRTPImplPriv::StreamSinkRTPImplPriv(const RTPSessionImplPtr& session, const string& sessionId) :
-    mSession(session), mSinkStateItem(new RtpStreamSinkStateItem)
-{
-    pjmedia_rtp_session_init(&mOutgoingSession, 0, pj_rand());
-    mSinkStateItem->mSessionId = sessionId;
-    mSinkStateItem->key = IceUtil::generateUUID();
-    mSinkStateItem->mRemotePort = 0;
-};
 
 /**
  * Constructor for the StreamSinkRTPImpl class.
  */
-StreamSinkRTPImpl::StreamSinkRTPImpl(const RTPSessionImplPtr& session, const string& sessionId) :
-    mImpl(new StreamSinkRTPImplPriv(session, sessionId))
+StreamSinkRTPImpl::StreamSinkRTPImpl(const SessionAdapterPtr& sessionAdapter, const PJMediaTransportPtr& transport, 
+  const string& sessionId) :
+    mImpl(new StreamSinkRTPImplPriv(sessionAdapter, transport, sessionId))
 {
 }
 
@@ -109,8 +107,9 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
 
         /* Using the available information construct an RTP header that we can place at the front of our packet */
         pj_status_t status = pjmedia_rtp_encode_rtp(&mImpl->mOutgoingSession,
-						    mImpl->mSession->getPayload((*frame)->mediaformat), 0, (int) (*frame)->payload.size(),
-						    (int) (*frame)->payload.size(), &header, &header_len);
+						    mImpl->mSession->getPayload((*frame)->mediaformat), 
+                                                    0, static_cast<int>((*frame)->payload.size()),
+                                                    static_cast<int>((*frame)->payload.size()), &header, &header_len);
 
         if (status != PJ_SUCCESS)
         {
@@ -128,7 +127,7 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
         pj_memcpy(packet + header_len, &(*frame)->payload[0], (*frame)->payload.size());
 
         /* All done, transmission can now occur */
-        status = pjmedia_transport_send_rtp(mImpl->mSession->getTransport(), packet,
+        status = pjmedia_transport_send_rtp(mImpl->mTransport->getTransport(), packet,
                 (*frame)->payload.size() + header_len);
 
         if (status != PJ_SUCCESS)
@@ -144,9 +143,8 @@ void StreamSinkRTPImpl::write(const AsteriskSCF::Media::V1::FrameSeq& frames, co
  */
 void StreamSinkRTPImpl::setSource(const AsteriskSCF::Media::V1::StreamSourcePrx& source, const Ice::Current&)
 {
-    mImpl->mSinkStateItem->mSource = source;
-
-    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
+    setSourceImpl(source);
+    mImpl->mSession->replicateState(mImpl->mSinkStateItem);
 }
 
 /**
@@ -168,10 +166,10 @@ AsteriskSCF::Media::V1::FormatSeq StreamSinkRTPImpl::getFormats(const Ice::Curre
 /**
  * Implementation of the getId method as defined in MediaIf.ice
  */
-std::string StreamSinkRTPImpl::getId(const Ice::Current& current)
+std::string StreamSinkRTPImpl::getId(const Ice::Current&)
 {
     /* For now utilize the id of the session */
-    return mImpl->mSession->getId(current);
+    return mImpl->mSinkStateItem->mSessionId;
 }
 
 /**
@@ -187,10 +185,9 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
     /* We do store it though in case we have not yet received a packet from the remote side but
      * are asked for the remote address. It is also stored for replication purposes.
      */
-    mImpl->mSinkStateItem->mRemoteAddress = address;
-    mImpl->mSinkStateItem->mRemotePort = port;
+    setRemoteDetailsImpl(address, port);
 
-    mImpl->mSession->replicateState(0, mImpl->mSinkStateItem, 0);
+    mImpl->mSession->replicateState(mImpl->mSinkStateItem);
 }
 
 /**
@@ -198,12 +195,7 @@ void StreamSinkRTPImpl::setRemoteDetails(const string& address, Ice::Int port, c
  */
 std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
 {
-    pjmedia_transport_info transportInfo;
-
-    pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
-
-    string address = pj_inet_ntoa(transportInfo.src_rtp_name.ipv4.sin_addr);
+    string address = mImpl->mTransport->remoteAddress()->hostname();
     return (address != "0.0.0.0") ? address : mImpl->mSinkStateItem->mRemoteAddress;
 }
 
@@ -212,12 +204,7 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
  */
 Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
 {
-    pjmedia_transport_info transportInfo;
-
-    pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
-
-    int port = pj_ntohs(transportInfo.src_rtp_name.ipv4.sin_port);
+    int port = mImpl->mTransport->remoteAddress()->port();
     return (port != 0) ? port : mImpl->mSinkStateItem->mRemotePort;
 }
 
@@ -228,3 +215,15 @@ RtpStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
 {
     return mImpl->mSinkStateItem;
 }
+
+
+void StreamSinkRTPImpl::setRemoteDetailsImpl(const std::string& host, Ice::Int port)
+{
+    mImpl->mSinkStateItem->mRemoteAddress = host;
+    mImpl->mSinkStateItem->mRemotePort = port;
+}
+
+void StreamSinkRTPImpl::setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx& proxy)
+{
+    mImpl->mSinkStateItem->mSource = proxy;
+}
\ No newline at end of file
diff --git a/src/RTPSink.h b/src/RTPSink.h
index 16c95e3..6462306 100644
--- a/src/RTPSink.h
+++ b/src/RTPSink.h
@@ -8,7 +8,8 @@
 
 #pragma once
 
-#include "RTPSession.h"
+#include "PJMediaTransport.h"
+#include "SessionAdapter.h"
 #include <string>
 #include <AsteriskSCF/Media/MediaIf.h>
 #include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
@@ -27,7 +28,8 @@ class StreamSinkRTPImplPriv;
 class StreamSinkRTPImpl : public AsteriskSCF::Media::RTP::V1::StreamSinkRTP
 {
 public:
-    StreamSinkRTPImpl(const RTPSessionImplPtr&, const std::string&);
+    StreamSinkRTPImpl(const AsteriskSCF::PJMediaRTP::SessionAdapterPtr&, 
+      const AsteriskSCF::PJMediaRTP::PJMediaTransportPtr&, const std::string&);
     void write(const AsteriskSCF::Media::V1::FrameSeq&, const Ice::Current&);
     void setSource(const AsteriskSCF::Media::V1::StreamSourcePrx&, const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSourcePrx getSource(const Ice::Current&);
@@ -37,6 +39,10 @@ public:
     std::string getRemoteAddress(const Ice::Current&);
     Ice::Int getRemotePort(const Ice::Current&);
     AsteriskSCF::Media::RTP::V1::RtpStreamSinkStateItemPtr getStateItem();
+
+    void setRemoteDetailsImpl(const std::string&, Ice::Int);
+    void setSourceImpl(const AsteriskSCF::Media::V1::StreamSourcePrx&);
+
 private:
     /**
      * Private implementation data for StreamSinkRTPImpl.
@@ -44,4 +50,4 @@ private:
     boost::shared_ptr<StreamSinkRTPImplPriv> mImpl;
 };
 
-typedef IceUtil::Handle<StreamSinkRTPImpl> StreamSinkRTPImplPtr;
\ No newline at end of file
+typedef IceUtil::Handle<StreamSinkRTPImpl> StreamSinkRTPImplPtr;
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 8160a4e..33c64e3 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -15,7 +15,6 @@
  */
 
 #include "RTPSource.h"
-#include "RTPSession.h"
 
 #include <Ice/Ice.h>
 #include <IceUtil/UUID.h>
@@ -30,6 +29,7 @@ using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::PJMediaRTP;
 
 namespace
 {
@@ -45,7 +45,15 @@ public:
     /**
      * Constructor for our StreamSourceRTPImplPriv class.
      */
-    StreamSourceRTPImplPriv(const RTPSessionImplPtr&, const string&);
+    StreamSourceRTPImplPriv(const SessionAdapterPtr& sessionAdapter, const PJMediaTransportPtr& transport, 
+      const string& sessionId) :
+        mSession(sessionAdapter), mTransport(transport), 
+        mSourceStateItem(new RtpStreamSourceStateItem)
+    {
+        pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
+        mSourceStateItem->mSessionId = sessionId;
+        mSourceStateItem->key = IceUtil::generateUUID();
+    }
 
     /**
      * A structure containing incoming pjmedia session data.
@@ -55,7 +63,9 @@ public:
     /**
      * A pointer to the RTP session we are associated with.
      */
-    RTPSessionImplPtr mSession;
+    SessionAdapterPtr mSession;
+
+    PJMediaTransportPtr mTransport;
 
     /**
      * Stream source state item.
@@ -64,21 +74,11 @@ public:
 };
 
 /**
- * Constructor for the StreamSourceRTPImplPriv class.
- */
-StreamSourceRTPImplPriv::StreamSourceRTPImplPriv(const RTPSessionImplPtr& session, const string& sessionId) :
-    mSession(session), mSourceStateItem(new RtpStreamSourceStateItem)
-{
-    pjmedia_rtp_session_init(&mIncomingSession, 0, 0);
-    mSourceStateItem->mSessionId = sessionId;
-    mSourceStateItem->key = IceUtil::generateUUID();
-};
-
-/**
  * Constructor for the StreamSourceRTPImpl class.
  */
-StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const string& sessionId) :
-    mImpl(new StreamSourceRTPImplPriv(session, sessionId))
+StreamSourceRTPImpl::StreamSourceRTPImpl(const SessionAdapterPtr& session, const PJMediaTransportPtr& transport, 
+    const string& sessionId) :
+    mImpl(new StreamSourceRTPImplPriv(session, transport, sessionId))
 {
 }
 
@@ -87,9 +87,8 @@ StreamSourceRTPImpl::StreamSourceRTPImpl(const RTPSessionImplPtr& session, const
  */
 void StreamSourceRTPImpl::setSink(const AsteriskSCF::Media::V1::StreamSinkPrx& sink, const Ice::Current&)
 {
-    mImpl->mSourceStateItem->mSink = sink;
-
-    mImpl->mSession->replicateState(0, 0, mImpl->mSourceStateItem);
+    setSinkImpl(sink);
+    mImpl->mSession->replicateState(mImpl->mSourceStateItem);
 }
 
 /**
@@ -111,10 +110,10 @@ AsteriskSCF::Media::V1::FormatSeq StreamSourceRTPImpl::getFormats(const Ice::Cur
 /**
  * Implementation of the getId method as defined in MediaRTPIf.ice
  */
-std::string StreamSourceRTPImpl::getId(const Ice::Current& current)
+std::string StreamSourceRTPImpl::getId(const Ice::Current&)
 {
     /* For now utilize the id of the session */
-    return mImpl->mSession->getId(current);
+    return mImpl->mSourceStateItem->mSessionId;
 }
 
 /**
@@ -131,12 +130,7 @@ void StreamSourceRTPImpl::requestFormat(const AsteriskSCF::Media::V1::FormatPtr&
  */
 std::string StreamSourceRTPImpl::getLocalAddress(const Ice::Current&)
 {
-    pjmedia_transport_info transportInfo;
-
-    pjmedia_transport_info_init(&transportInfo);
-    pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
-
-    return pj_inet_ntoa(transportInfo.sock_info.rtp_addr_name.ipv4.sin_addr);
+    return mImpl->mTransport->localAddress()->hostname();
 }
... 730 lines suppressed ...


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list