[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Dec 9 13:42:09 CST 2010
branch "replication" has been updated
via 59bd7af369b179b4c6135d92ac7876d992402571 (commit)
via 346b63d3c7a0621691dd6dcb33c0d9e6067b4845 (commit)
from 6534181ea193e0e6c614ea14a71f10c84cbeed5a (commit)
Summary of changes:
local-slice/RtpStateReplicationIf.ice | 5 ++
src/MediaRTPpjmedia.cpp | 75 ++++++++++++++++++++------------
src/RTPSource.cpp | 2 +-
src/RtpStateReplicator.h | 2 +-
src/RtpStateReplicatorListener.cpp | 16 +++++--
5 files changed, 65 insertions(+), 35 deletions(-)
- Log -----------------------------------------------------------------
commit 59bd7af369b179b4c6135d92ac7876d992402571
Author: Joshua Colp <jcolp at digium.com>
Date: Thu Dec 9 15:40:02 2010 -0400
Use a predefined name for the RTP media service, and don't register with the service locator unless active.
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 42ecceb..05801c5 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -50,6 +50,7 @@ Logger &lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
}
static const string ReplicaServiceId("MediaRtpReplica");
+static const string MediaServiceId("RTPMediaService");
/**
* Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
@@ -224,7 +225,7 @@ public:
/**
* A constructor for this implementation which just sets a few variables, nothing extreme.
*/
- ComponentServiceImpl(MediaRTPpjmediaApp& app, ServiceManagementPrx& management) : mApplication(app), mManagement(management) { };
+ ComponentServiceImpl(MediaRTPpjmediaApp& app, RtpGeneralStateItemPtr generalState) : mApplication(app), mGeneralState(generalState) { };
/**
* An implementation of the suspend method which actually suspends ourselves
@@ -232,7 +233,7 @@ public:
*/
virtual void suspend(const ::Ice::Current&)
{
- mManagement->suspend();
+ mGeneralState->mServiceManagement->suspend();
}
/**
@@ -241,7 +242,7 @@ public:
*/
virtual void resume(const ::Ice::Current&)
{
- mManagement->unsuspend();
+ mGeneralState->mServiceManagement->unsuspend();
}
/**
@@ -260,10 +261,9 @@ private:
MediaRTPpjmediaApp& mApplication;
/**
- * Our service locator management proxy, we'll use it to suspend and
- * unsuspend ourselves.
+ * Pointer to general state information.
*/
- ServiceManagementPrx& mManagement;
+ RtpGeneralStateItemPtr mGeneralState;
};
/**
@@ -426,17 +426,21 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
}
}
- RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->addWithUUID(rtpmediaservice));
+ RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->add(rtpmediaservice,
+ mCommunicator->stringToIdentity(MediaServiceId)));
ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
- mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
- /* Now we can add some parameters to help find us. */
- genericparams->category = "rtp";
- mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+ if (mReplicaService->isActive() == true)
+ {
+ mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+ /* Now we can add some parameters to help find us. */
+ genericparams->category = "rtp";
+ mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+ }
/* One must provide a component service to manage us, if someone wants to */
- ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState->mServiceManagement);
+ ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState);
ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
/* Let's add the component service to the service locator first */
commit 346b63d3c7a0621691dd6dcb33c0d9e6067b4845
Author: Joshua Colp <jcolp at digium.com>
Date: Thu Dec 9 15:29:07 2010 -0400
Add a general state item which is currently used to replicate the service locator proxy for the service.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 1e405e6..b449ee5 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -66,6 +66,11 @@ module V1
idempotent RtpStateItemSeq getAllState();
};
+ class RtpGeneralStateItem extends RtpStateItem
+ {
+ AsteriskSCF::Core::Discovery::V1::ServiceManagement *mServiceManagement;
+ };
+
class RtpSessionStateItem extends RtpStateItem
{
Ice::Identity mSessionIdentity;
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 407ffb3..42ecceb 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -16,6 +16,7 @@
#include <Ice/Ice.h>
#include <IceBox/IceBox.h>
+#include <IceUtil/UUID.h>
#include <pjlib.h>
#include <pjmedia.h>
@@ -158,6 +159,7 @@ private:
class MediaRTPpjmediaApp : public IceBox::Service
{
public:
+ MediaRTPpjmediaApp() : mGeneralState(new RtpGeneralStateItem()) { mGeneralState->key = IceUtil::generateUUID(); };
void start(const std::string&, const Ice::CommunicatorPtr&, const Ice::StringSeq&);
void stop();
@@ -183,11 +185,6 @@ private:
Ice::ObjectAdapterPtr mLoggerAdapter;
/**
- * A proxy to the service locator manager for the RTP media service.
- */
- ServiceManagementPrx mServiceManagement;
-
- /**
* A proxy to the service locator manager for the component service.
*/
ServiceManagementPrx mComponentServiceManagement;
@@ -211,6 +208,11 @@ private:
* A proxy to the state replicator.
*/
AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+
+ /**
+ * An instance of the general state information class.
+ */
+ RtpGeneralStateItemPtr mGeneralState;
};
/**
@@ -387,16 +389,6 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
ServiceLocatorManagementPrx management = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
- /* One must provide a component service to manage us, if someone wants to */
- ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mServiceManagement);
- ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
-
- /* Let's add the component service to the service locator first */
- mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
- ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
- genericparams->category = "Component/media_rtp_pjmedia";
- mComponentServiceManagement->addLocatorParams(genericparams, "");
-
// The service locator is required for state replicator operation, so go ahead and find it
ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
@@ -419,7 +411,7 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
if (mStateReplicator)
{
- mReplicatorListener = new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory());
+ mReplicatorListener = new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState);
mReplicatorListenerProxy = RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorListener", "no") == "yes")
@@ -436,10 +428,30 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->addWithUUID(rtpmediaservice));
- mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+
+ mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
/* Now we can add some parameters to help find us. */
genericparams->category = "rtp";
- mServiceManagement->addLocatorParams(genericparams, "");
+ mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+
+ /* One must provide a component service to manage us, if someone wants to */
+ ComponentServicePtr ComponentService = new ComponentServiceImpl(*this, mGeneralState->mServiceManagement);
+ ComponentServicePrx ComponentServiceProxy = ComponentServicePrx::uncheckedCast(mLocalAdapter->addWithUUID(ComponentService));
+
+ /* Let's add the component service to the service locator first */
+ mComponentServiceManagement = ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
+ genericparams->category = "Component/media_rtp_pjmedia";
+ mComponentServiceManagement->addLocatorParams(genericparams, "");
+
+ // Replicate general state information so the backup is ready
+ if (mStateReplicator && mReplicaService->isActive() == true)
+ {
+ RtpStateItemSeq items;
+ items.push_back(mGeneralState);
+ RtpStateReplicatorPrx oneway = RtpStateReplicatorPrx::uncheckedCast(mStateReplicator->ice_oneway());
+ oneway->setState(items);
+ }
}
/**
@@ -448,7 +460,10 @@ void MediaRTPpjmediaApp::start(const std::string& name, const Ice::CommunicatorP
void MediaRTPpjmediaApp::stop()
{
mComponentServiceManagement->unregister();
- mServiceManagement->unregister();
+ if (mReplicaService->isActive() == true)
+ {
+ mGeneralState-> mServiceManagement->unregister();
+ }
mCommunicator->destroy();
}
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index bb16518..3747677 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -207,7 +207,7 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
frames.push_back(frame);
}
- source->mImpl->mSourceStateItem->mSink->write(frames);
+ source->mImpl->mSourceStateItem->mSink->write(frames);
}
}
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index 2e57db7..d3bc6c9 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -28,7 +28,7 @@ typedef IceUtil::Handle<RtpStateReplicatorI> RtpStateReplicatorIPtr;
class RtpStateReplicatorListenerI : public RtpStateReplicatorListener
{
public:
- RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr, pj_pool_factory*);
+ RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr, pj_pool_factory*, RtpGeneralStateItemPtr);
~RtpStateReplicatorListenerI();
void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
void stateSet(const RtpStateItemSeq&, const Ice::Current&);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 23e0d33..f11b9d7 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -55,8 +55,8 @@ private:
struct RtpStateReplicatorListenerImpl
{
public:
- RtpStateReplicatorListenerImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
- : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory) {}
+ RtpStateReplicatorListenerImpl(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory, RtpGeneralStateItemPtr generalState)
+ : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState) {}
void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
{
for (Ice::StringSeq::const_iterator key = itemKeys.begin(); key != itemKeys.end(); ++key)
@@ -70,12 +70,17 @@ public:
for (RtpStateItemSeq::const_iterator item = items.begin(); item != items.end(); ++item)
{
std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> >::iterator i = mStateItems.find((*item)->mSessionId);
+ RtpGeneralStateItemPtr general;
RtpSessionStateItemPtr session;
RtpStreamSinkStateItemPtr sink;
RtpStreamSourceStateItemPtr source;
boost::shared_ptr<RtpStateReplicatorItem> localitem;
- if ((session = RtpSessionStateItemPtr::dynamicCast((*item))))
+ if ((general = RtpGeneralStateItemPtr::dynamicCast((*item))))
+ {
+ mGeneralState->mServiceManagement = general->mServiceManagement;
+ }
+ else if ((session = RtpSessionStateItemPtr::dynamicCast((*item))))
{
if (i == mStateItems.end())
{
@@ -119,10 +124,11 @@ public:
std::map<std::string, boost::shared_ptr<RtpStateReplicatorItem> > mStateItems;
Ice::ObjectAdapterPtr mAdapter;
pj_pool_factory *mPoolFactory;
+ RtpGeneralStateItemPtr mGeneralState;
};
-RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory)
- : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory)) {}
+RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(Ice::ObjectAdapterPtr adapter, pj_pool_factory *poolFactory, RtpGeneralStateItemPtr generalState)
+ : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState)) {}
RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
{
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list