[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed Dec 8 12:18:30 CST 2010


branch "replication" has been updated
       via  c19ee91ad54021f1842572f83640587604aa4266 (commit)
       via  c4eeea22d39f6cd45aec0390d8c80b30f82cf21e (commit)
       via  290ce3eb5f17c72163efaed78152213c9a5473f6 (commit)
       via  32737285dafb66a5e9040f883204da51017d2498 (commit)
      from  fa399cca8fa1687c155e2513a27cffffe435192a (commit)

Summary of changes:
 config/test_component.config.in    |    2 +
 config/test_media_rtp_pjmedia.conf |    1 +
 src/CMakeLists.txt                 |    9 ++-
 src/MediaRTPpjmedia.cpp            |  174 +++++++++++++++++++++++++++++++++---
 src/RTPSession.cpp                 |   50 ++++++++++-
 src/RTPSession.h                   |    3 +-
 src/RTPSink.cpp                    |   10 ++
 src/RTPSource.cpp                  |   10 ++
 src/RtpStateReplicator.h           |    2 +-
 src/RtpStateReplicatorApp.cpp      |    2 +
 src/RtpStateReplicatorListener.cpp |   11 ++-
 11 files changed, 250 insertions(+), 24 deletions(-)


- Log -----------------------------------------------------------------
commit c19ee91ad54021f1842572f83640587604aa4266
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 14:16:55 2010 -0400

    Fix misspelled configuration option, and add it to another config.

diff --git a/config/test_component.config.in b/config/test_component.config.in
index ba5d7c9..cf051a4 100644
--- a/config/test_component.config.in
+++ b/config/test_component.config.in
@@ -16,7 +16,7 @@ IceBox.Service.MediaRTPpjmedia=../src at media_rtp_pjmedia:create
 # Adapter parameters for this component
 MediaRTPpjmediaAdapter.Endpoints=default
 
-MediaRTPPjmediaAdapterLocal.Endpoints=default
+MediaRTPpjmediaAdapterLocal.Endpoints=default
 
 # A proxy to the service locator management service
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
diff --git a/config/test_media_rtp_pjmedia.conf b/config/test_media_rtp_pjmedia.conf
index 38feba5..2d75ff3 100644
--- a/config/test_media_rtp_pjmedia.conf
+++ b/config/test_media_rtp_pjmedia.conf
@@ -4,6 +4,7 @@ IceBox.Service.MediaRTPpjmedia=media_rtp_pjmedia:create
 
 # Adapter parameters for this component
 MediaRTPpjmediaAdapter.Endpoints=default
+MediaRTPpjmediaAdapterLocal.Endpoints=default
 
 # A proxy to the service locator management service
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422

commit c4eeea22d39f6cd45aec0390d8c80b30f82cf21e
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 14:15:09 2010 -0400

    Add missing getStateItem implementations.

diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index a4511a8..ed63b32 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -221,3 +221,11 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
     int port = pj_ntohs(transportInfo.src_rtp_name.ipv4.sin_port);
     return (port != 0) ? port : mImpl->mSinkStateItem->mRemotePort;
 }
+
+/**
+ * API call which returns a pointer to the sink state item.
+ */
+RtpStreamSinkStateItemPtr StreamSinkRTPImpl::getStateItem()
+{
+    return mImpl->mSinkStateItem;
+}
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 1083e2c..bb16518 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -240,3 +240,11 @@ void StreamSourceRTPImpl::setRemoteDetails(std::string address, int port)
         /* TODO: Decide what to do if this occurs, do we need an exception? */
     }
 }
+
+/**
+ * API call which returns a pointer to the source state item.
+ */
+RtpStreamSourceStateItemPtr StreamSourceRTPImpl::getStateItem()
+{
+    return mImpl->mSourceStateItem;
+}

commit 290ce3eb5f17c72163efaed78152213c9a5473f6
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 14:14:56 2010 -0400

    Link against the Rtp state replication interface.

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d3afc0e..96f4152 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -16,6 +16,7 @@ asterisk_scf_component_add_slice(media_rtp_pjmedia MediaIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia MediaRTPIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ComponentServiceIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ReplicaIf)
+asterisk_scf_component_add_slice(media_rtp_pjmedia RtpStateReplicationIf)
 asterisk_scf_component_add_file(media_rtp_pjmedia MediaRTPpjmedia.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSession.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.cpp)

commit 32737285dafb66a5e9040f883204da51017d2498
Author: Joshua Colp <jcolp at digium.com>
Date:   Wed Dec 8 14:09:06 2010 -0400

    Everything should now be talking to the state replicator application. Next up though is regression testing.

diff --git a/config/test_component.config.in b/config/test_component.config.in
index 91c4165..ba5d7c9 100644
--- a/config/test_component.config.in
+++ b/config/test_component.config.in
@@ -16,6 +16,8 @@ IceBox.Service.MediaRTPpjmedia=../src at media_rtp_pjmedia:create
 # Adapter parameters for this component
 MediaRTPpjmediaAdapter.Endpoints=default
 
+MediaRTPPjmediaAdapterLocal.Endpoints=default
+
 # A proxy to the service locator management service
 ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
 
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a4d07c9..d3afc0e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -6,11 +6,16 @@
 # All rights reserved.
 #
 
+
+include_directories(${utils_dir}/StateReplicator/src)
+include_directories(${utils_dir}/SmartProxy/src)
+
 asterisk_scf_component_init(media_rtp_pjmedia CXX)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ServiceLocatorIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia MediaIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia MediaRTPIf)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ComponentServiceIf)
+asterisk_scf_component_add_slice(media_rtp_pjmedia ReplicaIf)
 asterisk_scf_component_add_file(media_rtp_pjmedia MediaRTPpjmedia.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSession.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.cpp)
@@ -45,9 +50,6 @@ asterisk_scf_component_add_slice(RtpStateReplicator RtpStateReplicationIf)
 asterisk_scf_component_add_slice(RtpStateReplicator RoutingIf)
 asterisk_scf_component_add_slice(RtpStateReplicator MediaRTPIf)
 
-include_directories(${utils_dir}/StateReplicator/src)
-include_directories(${utils_dir}/SmartProxy/src)
-
 asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicatorApp.cpp)
 asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicator.h)
 
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 7397bfc..1b2db05 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -29,8 +29,11 @@
 #include "IceLogger.h"
 #include "logger.h"
 #include "RtpStateReplicationIf.h"
+#include "ReplicaIf.h"
+#include "SmartProxy.h"
 
 #include "RTPSession.h"
+#include "RtpStateReplicator.h"
 
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
@@ -38,20 +41,24 @@ using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
 using namespace AsteriskSCF::System::Component::V1;
 using namespace AsteriskSCF::System::Logging;
+using namespace AsteriskSCF::SmartProxy;
 
 namespace
 {
 Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 }
 
+static const string ReplicaServiceId("MediaRtpReplica");
+
 /**
  * Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
  */
 class RTPMediaServiceImpl : public RTPMediaService
 {
 public:
-    RTPMediaServiceImpl(Ice::ObjectAdapterPtr);
+    RTPMediaServiceImpl(Ice::ObjectAdapterPtr, ReplicaPtr, AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>);
     RTPSessionPrx allocate(const FormatSeq&, const Ice::Current&);
+    pj_pool_factory *getPoolFactory() { return &mCachingPool.factory; };
 private:
     /**
      * A pointer to the object adapter that objects should be added to.
@@ -67,6 +74,82 @@ private:
      * Memory pool.
      */
     pj_pool_t* mMemoryPool;
+
+    /**
+     * A pointer to the replica service.
+     */
+    ReplicaPtr mReplicaService;
+
+    /**
+     * A proxy to the state replicator.
+     */
+    AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+};
+
+/**
+ * Typedef which gives us a smart pointer type for RTPMediaServiceImpl class.
+ */
+typedef IceUtil::Handle<RTPMediaServiceImpl> RTPMediaServiceImplPtr;
+
+/**                                                                                                                                                                                                                                          
+ * This class provides implementation for the Replica interface.                                                                                                                                                                             
+ */
+class ReplicaImpl : public Replica
+{
+public:
+    ReplicaImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter), mPaused(false), mActive(true) { }
+
+    bool isActive(const Ice::Current&)
+    {
+        return mActive;
+    }
+
+    bool activate(const Ice::Current&)
+    {
+        mActive = true;
+
+        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        {
+            (*listener)->activated(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+        }
+
+        return true;
+    }
+
+    void standby(const Ice::Current&)
+    {
+        mActive = false;
+
+        for (vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx>::const_iterator listener = mListeners.begin(); listener != mListeners.end(); ++listener)
+        {
+            (*listener)->onStandby(ReplicaPrx::uncheckedCast(mAdapter->createDirectProxy(mAdapter->getCommunicator()->stringToIdentity(ReplicaServiceId))));
+        }
+    }
+
+    void addListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    {
+        mListeners.push_back(listener);
+    }
+
+    void removeListener(const AsteriskSCF::System::Component::V1::ReplicaListenerPrx& listener, const Ice::Current&)
+    {
+        mListeners.erase(std::remove(mListeners.begin(), mListeners.end(), listener), mListeners.end());
+    }
+
+private:
+    /**                                                                                                                                                                                                                                      
+     * Pointer to the object adapter we exist on.                                                                                                                                                                                            
+     */
+    Ice::ObjectAdapterPtr mAdapter;
+
+    /**                                                                                                                                                                                                                                      
+     * Listeners that we need to push state change notifications out to.                                                                                                                                                                     
+     */
+    vector<AsteriskSCF::System::Component::V1::ReplicaListenerPrx> mListeners;
+
+    bool mPaused;
+
+    bool mActive;
 };
 
 /**
@@ -85,9 +168,14 @@ private:
     Ice::CommunicatorPtr mCommunicator;
 
     /**
-     * Object adapter that everything is associated with.
+     * Object adapter that global stuff is associated with.
      */
-    Ice::ObjectAdapterPtr mAdapter;
+    Ice::ObjectAdapterPtr mGlobalAdapter;
+
+    /**
+     * Object adapter that local stuff is associated with.
+     */
+    Ice::ObjectAdapterPtr mLocalAdapter;
 
     /**
      * The object adapter for the Logger.
@@ -103,6 +191,26 @@ private:
      * A proxy to the service locator manager for the component service.
      */
     ServiceManagementPrx mComponentServiceManagement;
+
+    /**
+     * Instance of our replica implementation.
+     */
+    ReplicaPtr mReplicaService;
+
+    /**
+     * Instance of our state replicator listener.
+     */
+    RtpStateReplicatorListenerPtr mReplicatorListener;
+
+    /**
+     * A proxy to our state replicator listener.
+     */
+    RtpStateReplicatorListenerPrx mReplicatorListenerProxy;
+
+    /**
+     * A proxy to the state replicator.
+     */
+    AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
 };
 
 /**
@@ -210,7 +318,8 @@ private:
 /**
  * Constructor for the RTPMediaServiceImpl class.
  */
-RTPMediaServiceImpl::RTPMediaServiceImpl(Ice::ObjectAdapterPtr adapter) : mAdapter(adapter)
+RTPMediaServiceImpl::RTPMediaServiceImpl(Ice::ObjectAdapterPtr adapter, ReplicaPtr replicaService, AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> stateReplicator) :
+    mAdapter(adapter), mReplicaService(replicaService), mStateReplicator(stateReplicator)
 {
     /* Initialize the memory caching pool using default policy as specified by pjlib. */
     pj_caching_pool_init(&mCachingPool, &pj_pool_factory_default_policy, 0);
@@ -224,7 +333,7 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(Ice::ObjectAdapterPtr adapter) : mAdapt
  */
 RTPSessionPrx RTPMediaServiceImpl::allocate(const FormatSeq& formats, const Ice::Current&)
 {
-    RTPSessionImplPtr session = new RTPSessionImpl(mAdapter, formats, &mCachingPool.factory);
+    RTPSessionImplPtr session = new RTPSessionImpl(mAdapter, formats, &mCachingPool.factory, mReplicaService, mStateReplicator);
     return session->getProxy();
 }
 
@@ -263,23 +372,24 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 
     mCommunicator = Ice::initialize(id);
 
-    mAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapter");
+    mLocalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapterLocal");
+
+    mReplicaService = new ReplicaImpl(mLocalAdapter);
+    mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
 
-    RTPMediaServicePtr rtpmediaservice = new RTPMediaServiceImpl(mAdapter);
+    mLocalAdapter->activate();
 
-    RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mAdapter->addWithUUID(rtpmediaservice));
+    mGlobalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapter");
 
-    mAdapter->activate();
+    mGlobalAdapter->activate();
 
     lg(Info) << "Activated pjmedia rtp component media service." << endl;
 
     ServiceLocatorManagementPrx management = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
 
-    mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
-
     /* One must provide a component service to manage us, if someone wants to */
     ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mServiceManagement);
-    ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mAdapter->addWithUUID(ComponentService));
+    ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
 
     /* Let's add the component service to the service locator first */
     mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
@@ -287,6 +397,46 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
     genericparams->category = "Component/media_rtp_pjmedia";
     mComponentServiceManagement->addLocatorParams(genericparams, "");
 
+    // The service locator is required for state replicator operation, so go ahead and find it
+    ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
+
+    // Look for the configured state replicator or default one
+    RtpStateReplicatorParamsPtr replicatorParams = new RtpStateReplicatorParams();
+    replicatorParams->category = StateReplicatorDiscoveryCategory;
+    replicatorParams->mName = mCommunicator->getProperties()->getPropertyWithDefault("Sip.StateReplicatorName", "default");
+
+    try
+    {  
+	AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> pw(locator, replicatorParams, lg);
+        mStateReplicator = pw;
+    }
+    catch (...)
+    {
+        lg(Error) << "State replicator could not be found, operating without.";
+    }
+
+    RTPMediaServiceImplPtr rtpmediaservice = new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator);
+
+    if (mStateReplicator)
+    {
+        mReplicatorListener = new RtpStateReplicatorListenerI(mLocalAdapter, rtpmediaservice->getPoolFactory());
+        mReplicatorListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
+
+	if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorListener", "no") == "yes")
+	{
+	    mStateReplicator->addListener(mReplicatorListenerProxy);
+	    mReplicaService->standby();
+	    lg(Info) << "Operating as a standby replica." << endl;
+	}
+	else
+	{
+	    lg(Info) << "Operating in an active state." << endl;
+	}
+    }
+
+    RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->addWithUUID(rtpmediaservice));
+
+    mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
     /* Now we can add some parameters to help find us. */
     genericparams->category = "rtp";
     mServiceManagement->addLocatorParams(genericparams, "");
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 134ec5f..a3d1543 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -23,6 +23,8 @@
 #include "Media/MediaIf.h"
 #include "Media/RTP/MediaRTPIf.h"
 #include "RtpStateReplicationIf.h"
+#include "ReplicaIf.h"
+#include "SmartProxy.h"
 
 #include "RTPSession.h"
 #include "RTPSource.h"
@@ -32,6 +34,8 @@ using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
 using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::System::Component::V1;
+using namespace AsteriskSCF::SmartProxy;
 
 /**
  * Default value for where we should start allocating RTP and RTCP ports from.
@@ -49,7 +53,8 @@ using namespace AsteriskSCF::Media::RTP::V1;
 class RTPSessionImplPriv
 {
 public:
-    RTPSessionImplPriv(Ice::ObjectAdapterPtr adapter, const FormatSeq& formats) : mAdapter(adapter), mFormats(formats), mSessionStateItem(new RtpSessionStateItem()) { };
+    RTPSessionImplPriv(Ice::ObjectAdapterPtr adapter, const FormatSeq& formats, ReplicaPtr replicaService, AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> stateReplicator) :
+	mAdapter(adapter), mFormats(formats), mSessionStateItem(new RtpSessionStateItem()), mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
     ~RTPSessionImplPriv();
 
     /**
@@ -106,12 +111,23 @@ public:
      * Session state item.
      */
     RtpSessionStateItemPtr mSessionStateItem;
+
+    /**
+     * A pointer to the replica instance.
+     */
+    ReplicaPtr mReplicaService;
+
+    /**
+     * A proxy to the state replicator where we are sending updates to.
+     */
+    AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
 };
 
 /**
  * Constructor for the RTPSessionImpl class (used by Ice).
  */
-RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& formats, pj_pool_factory* factory) : mImpl(new RTPSessionImplPriv(adapter, formats))
+RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& formats, pj_pool_factory* factory, ReplicaPtr replicaService, AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> stateReplicator) : 
+    mImpl(new RTPSessionImplPriv(adapter, formats, replicaService, stateReplicator))
 {
     /* Add ourselves to the ICE ASM so we can be used. */
     mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->addWithUUID(this));
@@ -157,7 +173,7 @@ RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, const FormatSeq& f
  * Constructor for the RTPSessionImpl class (used by state replicator).
  */
 RTPSessionImpl::RTPSessionImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory* factory, Ice::Identity sessionIdentity, Ice::Identity sinkIdentity,
-    Ice::Identity sourceIdentity, Ice::Int port, const FormatSeq& formats) : mImpl(new RTPSessionImplPriv(adapter, formats))
+    Ice::Identity sourceIdentity, Ice::Int port, const FormatSeq& formats) : mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>)))
 {
     mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
 
@@ -392,6 +408,20 @@ void RTPSessionImpl::replicateState(RtpSessionStateItemPtr session, RtpStreamSin
     {
 	items.push_back(source);
     }
+
+    if (items.size() == 0)
+    {
+	return;
+    }
+
+    try
+    {
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+	oneway->setState(items);
+    }
+    catch (...)
+    {
+    }
 }
 
 /**
@@ -417,4 +447,18 @@ void RTPSessionImpl::removeState(RtpSessionStateItemPtr session, RtpStreamSinkSt
     {
 	items.push_back(source->key);
     }
+
+    if (items.size() == 0)
+    {
+	return;
+    }
+
+    try
+    {
+	RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mImpl->mStateReplicator->ice_oneway());
+	oneway->removeState(items);
+    }
+    catch (...)
+    {
+    }
 }
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 005a054..a76b610 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -41,7 +41,8 @@ typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
 class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
 {
 public:
-    RTPSessionImpl(Ice::ObjectAdapterPtr, const AsteriskSCF::Media::V1::FormatSeq&, pj_pool_factory*);
+    RTPSessionImpl(Ice::ObjectAdapterPtr, const AsteriskSCF::Media::V1::FormatSeq&, pj_pool_factory*, AsteriskSCF::System::Component::V1::ReplicaPtr, 
+    		   AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>);
     RTPSessionImpl(Ice::ObjectAdapterPtr, pj_pool_factory*, Ice::Identity, Ice::Identity, Ice::Identity, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&);
     AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index c07c251..a4511a8 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -23,6 +23,8 @@
 #include "Media/MediaIf.h"
 #include "Media/RTP/MediaRTPIf.h"
 #include "RtpStateReplicationIf.h"
+#include "ReplicaIf.h"
+#include "SmartProxy.h"
 
 #include "RTPSession.h"
 #include "RTPSink.h"
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 1b8e14e..1083e2c 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -23,6 +23,8 @@
 #include "Media/MediaIf.h"
 #include "Media/RTP/MediaRTPIf.h"
 #include "RtpStateReplicationIf.h"
+#include "ReplicaIf.h"
+#include "SmartProxy.h"
 
 #include "RTPSession.h"
 #include "RTPSource.h"
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index 4741f2b..2e57db7 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -28,7 +28,7 @@ typedef IceUtil::Handle<RtpStateReplicatorI> RtpStateReplicatorIPtr;
 class RtpStateReplicatorListenerI : public RtpStateReplicatorListener
 {
 public:
-    RtpStateReplicatorListenerI();
+    RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr, pj_pool_factory*);
     ~RtpStateReplicatorListenerI();
     void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
     void stateSet(const RtpStateItemSeq&, const Ice::Current&);
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index 7d78c07..8fffef6 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -19,6 +19,8 @@
 #include <IceStorm/IceStorm.h>
 #include <IceBox/IceBox.h>
 
+#include <pjlib.h>
+
 #include "ServiceLocatorIf.h"
 #include "ComponentServiceIf.h"
 #include "RtpStateReplicator.h"
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 0686367..423f8ef 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -22,6 +22,9 @@
 #include <pjlib.h>
 #include <pjmedia.h>
 
+#include "ReplicaIf.h"
+#include "SmartProxy.h"
+
 #include "RtpStateReplicator.h"
 #include "RTPSession.h"
 #include "RTPSink.h"
@@ -52,8 +55,8 @@ private:
 struct RtpStateReplicatorListenerImpl
 {
 public:
-    RtpStateReplicatorListenerImpl()
-        : mId(IceUtil::generateUUID()) {}
+    RtpStateReplicatorListenerImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
+        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory) {}
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
     {
         for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
@@ -118,8 +121,8 @@ public:
     pj_pool_factory *mPoolFactory;
 };
 
-RtpStateReplicatorListenerI::RtpStateReplicatorListenerI()
-    : mImpl(new RtpStateReplicatorListenerImpl()) {}
+RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
+    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory)) {}
 
 RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
 {

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list