[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Dec 9 13:42:09 CST 2010


branch "replication" has been updated
       via  59bd7af369b179b4c6135d92ac7876d992402571 (commit)
       via  346b63d3c7a0621691dd6dcb33c0d9e6067b4845 (commit)
      from  6534181ea193e0e6c614ea14a71f10c84cbeed5a (commit)

Summary of changes:
 local-slice/RtpStateReplicationIf.ice |    5 ++
 src/MediaRTPpjmedia.cpp               |   75 ++++++++++++++++++++------------
 src/RTPSource.cpp                     |    2 +-
 src/RtpStateReplicator.h              |    2 +-
 src/RtpStateReplicatorListener.cpp    |   16 +++++--
 5 files changed, 65 insertions(+), 35 deletions(-)


- Log -----------------------------------------------------------------
commit 59bd7af369b179b4c6135d92ac7876d992402571
Author: Joshua Colp <jcolp at digium.com>
Date:   Thu Dec 9 15:40:02 2010 -0400

    Use a predefined name for the RTP media service, and don't register with the service locator unless active.

diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 42ecceb..05801c5 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -50,6 +50,7 @@ Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 }
 
 static const string ReplicaServiceId("MediaRtpReplica");
+static const string MediaServiceId("RTPMediaService");
 
 /**
  * Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
@@ -224,7 +225,7 @@ public:
     /**
      * A constructor for this implementation which just sets a few variables, nothing extreme.
      */
-    ComponentServiceImpl(MediaRTPpjmediaApp& app, ServiceManagementPrx& management) : mApplication(app), mManagement(management) { };
+    ComponentServiceImpl(MediaRTPpjmediaApp& app, RtpGeneralStateItemPtr generalState) : mApplication(app), mGeneralState(generalState) { };
 
     /**
      * An implementation of the suspend method which actually suspends ourselves
@@ -232,7 +233,7 @@ public:
      */
     virtual void suspend(const ::Ice::Current&)
     {
-        mManagement->suspend();
+        mGeneralState->mServiceManagement->suspend();
     }
 
     /**
@@ -241,7 +242,7 @@ public:
      */
     virtual void resume(const ::Ice::Current&)
     {
-        mManagement->unsuspend();
+        mGeneralState->mServiceManagement->unsuspend();
     }
 
     /**
@@ -260,10 +261,9 @@ private:
     MediaRTPpjmediaApp& mApplication;
 
     /**
-     * Our service locator management proxy, we'll use it to suspend and
-     * unsuspend ourselves.
+     * Pointer to general state information.
      */
-    ServiceManagementPrx& mManagement;
+    RtpGeneralStateItemPtr mGeneralState;
 };
 
 /**
@@ -426,17 +426,21 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 	}
     }
 
-    RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->addWithUUID(rtpmediaservice));
+    RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->add(rtpmediaservice,
+	    mCommunicator->stringToIdentity(MediaServiceId)));
 
     ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
 
-    mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
-    /* Now we can add some parameters to help find us. */
-    genericparams->category = "rtp";
-    mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+    if (mReplicaService->isActive() == true)
+    {
+	mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+	/* Now we can add some parameters to help find us. */
+	genericparams->category = "rtp";
+	mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+    }
 
     /* One must provide a component service to manage us, if someone wants to */
-    ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState->mServiceManagement);
+    ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState);
     ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
 
     /* Let's add the component service to the service locator first */

commit 346b63d3c7a0621691dd6dcb33c0d9e6067b4845
Author: Joshua Colp <jcolp at digium.com>
Date:   Thu Dec 9 15:29:07 2010 -0400

    Add a general state item which is currently used to replicate the service locator proxy for the service.

diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 1e405e6..b449ee5 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -66,6 +66,11 @@ module V1
 	   idempotent RtpStateItemSeq getAllState();
    };
 
+   class RtpGeneralStateItem extends RtpStateItem
+   {
+       AsteriskSCF::Core::Discovery::V1::ServiceManagement *mServiceManagement;
+   };
+
    class RtpSessionStateItem extends RtpStateItem
    {
        Ice::Identity mSessionIdentity;
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 407ffb3..42ecceb 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -16,6 +16,7 @@
 
 #include <Ice/Ice.h>
 #include <IceBox/IceBox.h>
+#include <IceUtil/UUID.h>
 
 #include <pjlib.h>
 #include <pjmedia.h>
@@ -158,6 +159,7 @@ private:
 class MediaRTPpjmediaApp : public IceBox::Service
 {
 public:
+    MediaRTPpjmediaApp() : mGeneralState(new RtpGeneralStateItem()) { mGeneralState->key = IceUtil::generateUUID(); };
     void start(const std::string&, const Ice::CommunicatorPtr&, const Ice::StringSeq&);
     void stop();
 
@@ -183,11 +185,6 @@ private:
     Ice::ObjectAdapterPtr mLoggerAdapter;
 
     /**
-     * A proxy to the service locator manager for the RTP media service.
-     */
-    ServiceManagementPrx mServiceManagement;
-
-    /**
      * A proxy to the service locator manager for the component service.
      */
     ServiceManagementPrx mComponentServiceManagement;
@@ -211,6 +208,11 @@ private:
      * A proxy to the state replicator.
      */
     AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+
+    /**
+     * An instance of the general state information class.
+     */
+    RtpGeneralStateItemPtr mGeneralState;
 };
 
 /**
@@ -387,16 +389,6 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 
     ServiceLocatorManagementPrx management = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
 
-    /* One must provide a component service to manage us, if someone wants to */
-    ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mServiceManagement);
-    ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
-
-    /* Let's add the component service to the service locator first */
-    mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
-    ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
-    genericparams->category = "Component/media_rtp_pjmedia";
-    mComponentServiceManagement->addLocatorParams(genericparams, "");
-
     // The service locator is required for state replicator operation, so go ahead and find it
     ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
 
@@ -419,7 +411,7 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 
     if (mStateReplicator)
     {
-        mReplicatorListener = new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory());
+        mReplicatorListener = new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState);
         mReplicatorListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
 
 	if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorListener", "no") == "yes")
@@ -436,10 +428,30 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 
     RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->addWithUUID(rtpmediaservice));
 
-    mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+    ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+
+    mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
     /* Now we can add some parameters to help find us. */
     genericparams->category = "rtp";
-    mServiceManagement->addLocatorParams(genericparams, "");
+    mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+
+    /* One must provide a component service to manage us, if someone wants to */
+    ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState->mServiceManagement);
+    ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
+
+    /* Let's add the component service to the service locator first */
+    mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
+    genericparams->category = "Component/media_rtp_pjmedia";
+    mComponentServiceManagement->addLocatorParams(genericparams, "");
+
+    // Replicate general state information so the backup is ready
+    if (mStateReplicator && mReplicaService->isActive() == true)
+    {
+        RtpStateItemSeq items;
+	items.push_back(mGeneralState);
+        RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mStateReplicator->ice_oneway());
+        oneway->setState(items);
+    }
 }
 
 /**
@@ -448,7 +460,10 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
 void MediaRTPpjmediaApp::stop()
 {
     mComponentServiceManagement->unregister();
-    mServiceManagement->unregister();
+    if (mReplicaService->isActive() == true)
+    {
+	mGeneralState-> mServiceManagement->unregister();
+    }
     mCommunicator->destroy();
 }
 
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index bb16518..3747677 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -207,7 +207,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
                 frames.push_back(frame);
             }
 
-            source->mImpl->mSourceStateItem->mSink->write(frames);
+	    source->mImpl->mSourceStateItem->mSink->write(frames);
         }
     }
 
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index 2e57db7..d3bc6c9 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -28,7 +28,7 @@ typedef IceUtil::Handle<RtpStateReplicatorI> RtpStateReplicatorIPtr;
 class RtpStateReplicatorListenerI : public RtpStateReplicatorListener
 {
 public:
-    RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr, pj_pool_factory*);
+    RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr, pj_pool_factory*, RtpGeneralStateItemPtr);
     ~RtpStateReplicatorListenerI();
     void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
     void stateSet(const RtpStateItemSeq&, const Ice::Current&);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 23e0d33..f11b9d7 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -55,8 +55,8 @@ private:
 struct RtpStateReplicatorListenerImpl
 {
 public:
-    RtpStateReplicatorListenerImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
-        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory) {}
+    RtpStateReplicatorListenerImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory, RtpGeneralStateItemPtr generalState)
+        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState) {}
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
     {
         for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
@@ -70,12 +70,17 @@ public:
         for (RtpStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
         {
             std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mStateItems.find((*item)->mSessionId);
+	    RtpGeneralStateItemPtr general;
 	    RtpSessionStateItemPtr session;
 	    RtpStreamSinkStateItemPtr sink;
 	    RtpStreamSourceStateItemPtr source;
 	    boost::shared_ptr<RtpStateReplicatorItem> localitem;
 
-	    if ((session = RtpSessionStateItemPtr::dynamicCast((*item))))
+	    if ((general = RtpGeneralStateItemPtr::dynamicCast((*item))))
+	    {
+		mGeneralState->mServiceManagement = general->mServiceManagement;
+	    }
+	    else if ((session = RtpSessionStateItemPtr::dynamicCast((*item))))
 	    {
 		if (i == mStateItems.end())
 		{
@@ -119,10 +124,11 @@ public:
     std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> > mStateItems;
     Ice::ObjectAdapterPtr mAdapter;
     pj_pool_factory *mPoolFactory;
+    RtpGeneralStateItemPtr mGeneralState;
 };
 
-RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
-    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory)) {}
+RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory, RtpGeneralStateItemPtr generalState)
+    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState)) {}
 
 RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
 {

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list