[asterisk-scf-commits] asterisk-scf/release/media_rtp_pjmedia.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon May 23 10:19:02 CDT 2011
branch "master" has been updated
via 1fa1d2704fd2b6c31f0f72c6c8d7e44b49efb31a (commit)
via 1476d4d52c1cf76a287c34031310d3c71b30bf5b (commit)
via 468c255b5ece37590cdbdd1121541dfc305f2d95 (commit)
via 70dab79e95c3da49b6a1ea7ed8564530fd57a389 (commit)
via a9f2d2c2f87162aaeec2d1080b860e059fe31cbd (commit)
via cd8b75e4635108c0a032348c1c159441ffbbb26f (commit)
via 7c62f27966769e9d75d22eb80044d055bc4dd195 (commit)
via 590ca45f3604f215b8019bf0f8d9cc9a8f952b33 (commit)
via 4740d0813de6b66cd70000b8b7c10dd883520ac6 (commit)
via e04d914ecae9eb1c24a87fb18d8f93b46335c760 (commit)
via 6eaa470cd582ad9231eb69aa335312a880cce9fe (commit)
via 366fae18463169b226613285ba70842374c718ab (commit)
via c85c80143e620332470b3d82df17f4d9e65140b1 (commit)
via f27572008be8a6a37d8aaee4e8a7b29fb8e7339c (commit)
via d418b0865dc86ba0d4a4f522a05ae685d77fb14f (commit)
via 014b58607f05d3c99be3e5edf26a81545a04ed52 (commit)
via 8c950ee6637816c1a07b9864a8a9fdb627cb632c (commit)
via 5ef3067c4fd3bc3c6036f0a6ac3c1bbfe7d7d488 (commit)
via 6bc19fc030ca626b7b86b1ba39098d5a7fbaec60 (commit)
via 0bde61f3c48d1fa13fd58115ca609f2c9a8fe954 (commit)
via 8c0401743b4bb5b0f6db9533631d6aa0822cd975 (commit)
via 7bd1a2b33fc2c8037d4c79ea204a29ab72072414 (commit)
via eed17133034e54adb065166ad1795a62c2d73f41 (commit)
via 1467fd724497c5b072d3fafdc03b9d52bb9cb731 (commit)
from 2fe8c6154989313a4f40eb45cccd5b946f8f495b (commit)
Summary of changes:
config/RtpConfigurator.py | 6 ++-
config/test_component.config | 8 ++
config/test_component_v6.config | 8 ++
local-slice/RtpConfigurationIf.ice | 18 +++++
local-slice/RtpStateReplicationIf.ice | 1 +
src/CMakeLists.txt | 2 +
src/MediaRTPpjmedia.cpp | 87 ++++++++++++++++++++++--
src/RtpStateReplicator.h | 1 +
src/RtpStateReplicatorApp.cpp | 119 +++++++++++++++++++++++++++++++++
9 files changed, 243 insertions(+), 7 deletions(-)
- Log -----------------------------------------------------------------
commit 1fa1d2704fd2b6c31f0f72c6c8d7e44b49efb31a
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 23 12:00:40 2011 -0300
Update test configuration file to work with configuration replicator.
diff --git a/config/test_component_v6.config b/config/test_component_v6.config
index 250ad0e..36021d4 100644
--- a/config/test_component_v6.config
+++ b/config/test_component_v6.config
@@ -56,6 +56,14 @@ AsteriskSCFIceStorm.Transient=1
AsteriskSCFIceStorm.Flush.Timeout=2000
TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+RtpStateReplicatorIceStorm.InstanceName=RtpStateReplicatorIceStorm
+RtpStateReplicatorIceStorm.TopicManager.Endpoints=default -p 10005
+RtpStateReplicatorIceStorm.Publish.Endpoints=default -p 10006
+RtpStateReplicatorIceStorm.Trace.TopicManager=2
+RtpStateReplicatorIceStorm.Transient=1
+RtpStateReplicatorIceStorm.Flush.Timeout=2000
+RtpStateReplicatorTopicManager.Proxy=RtpStateReplicatorIceStorm/TopicManager:default -p 10005
+
ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
ServiceLocatorAdapter.Endpoints=tcp -p 4411
ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
commit 1476d4d52c1cf76a287c34031310d3c71b30bf5b
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 23 11:43:54 2011 -0300
Update component to use new configuration replicator interface which is presented using a facet on the state replicator.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 385b5a1..a8b4ac8 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -70,7 +70,6 @@ module V1
void removeState(Ice::StringSeq items);
idempotent RtpStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RtpStateItemSeq getAllState();
- void registerConfigurationService(AsteriskSCF::System::Configuration::V1::ConfigurationService *service);
};
class RtpGeneralStateItem extends RtpStateItem
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index e106028..b05c352 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -534,7 +534,9 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
}
else if (mStateReplicator)
{
- mStateReplicator->registerConfigurationService(mConfigurationServiceProxy);
+ ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
+ mStateReplicator.initialize(), ReplicatorFacet);
+ configurationReplicator->registerConfigurationService(mConfigurationServiceProxy);
}
if (mStateReplicator)
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index a65f52b..189e809 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -26,6 +26,7 @@
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/CollocatedIceStorm/CollocatedIceStorm.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include "RtpConfigurationIf.h"
#include "RtpStateReplicator.h"
@@ -38,6 +39,7 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::RTP::V1;
using namespace AsteriskSCF::Media::RTP;
using namespace AsteriskSCF::CollocatedIceStorm;
+using namespace AsteriskSCF::System::Configuration::V1;
namespace
{
@@ -53,6 +55,7 @@ public:
mComponentService = 0;
mAdapter = 0;
mStateReplicator = 0;
+ mConfigurationReplicator = 0;
};
virtual void start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
virtual void stop();
@@ -68,6 +71,7 @@ private:
ConfiguredIceLoggerPtr mIceLogger;
ComponentServicePtr mComponentService;
RtpStateReplicatorIPtr mStateReplicator;
+ ConfigurationReplicatorPtr mConfigurationReplicator;
CollocatedIceStormPtr mIceStorm;
Ice::ObjectPrx mConfigurationPublisher;
Discovery::V1::ServiceManagementPrx mConfigurationManagement;
@@ -144,16 +148,16 @@ private:
typedef IceUtil::Handle<RtpConfigurationCompare> RtpConfigurationComparePtr;
-class RtpStateReplicatorConfigI : public RtpStateReplicatorI
+class ConfigurationReplicatorI : public ConfigurationReplicator
{
public:
- RtpStateReplicatorConfigI(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
+ ConfigurationReplicatorI(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
void registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
private:
IceStorm::TopicPrx mConfigurationReplicationTopic;
};
-void RtpStateReplicatorConfigI::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
+void ConfigurationReplicatorI::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
{
if (mConfigurationReplicationTopic)
{
@@ -316,8 +320,11 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
// Create and publish our ComponentService interface support.
mComponentService = new ComponentServiceImpl(*this);
mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
- mStateReplicator = new RtpStateReplicatorConfigI(topic);
+ mStateReplicator = new RtpStateReplicatorI();
mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
+ mConfigurationReplicator = new ConfigurationReplicatorI(topic);
+ mAdapter->addFacet(mConfigurationReplicator, ic->stringToIdentity(ServiceDiscoveryId),
+ ReplicatorFacet);
mAdapter->activate();
}
commit 468c255b5ece37590cdbdd1121541dfc305f2d95
Merge: 70dab79 2fe8c61
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 23 11:22:25 2011 -0300
Merge branch 'master' into configuration-replication
Conflicts:
config/RtpConfigurator.py
config/test_media_rtp_pjmedia.conf
local-slice/RtpConfigurationIf.ice
src/MediaRTPpjmedia.cpp
src/RTPSession.cpp
src/RTPSession.h
src/RtpStateReplicator.h
src/RtpStateReplicatorListener.cpp
diff --cc src/MediaRTPpjmedia.cpp
index 3c397e0,9989ddb..e106028
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@@ -525,6 -492,17 +557,17 @@@ void MediaRTPpjmediaApp::start(const st
}
}
+ ServiceLocatorParamsComparePtr rtpmediacomparatorservice = new RTPMediaServiceCompareServiceImpl();
+ ServiceLocatorParamsComparePrx RTPMediaComparatorServiceProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mGlobalAdapter->add(rtpmediacomparatorservice, mCommunicator->stringToIdentity(MediaComparatorServiceId)));
+
+ if (mReplicaService->isActive() == true)
+ {
+ mGeneralState->mComparatorId = IceUtil::generateUUID();
- management->addCompare(mGeneralState->mComparatorId, RTPMediaComparatorServiceProxy);
++ mManagement->addCompare(mGeneralState->mComparatorId, RTPMediaComparatorServiceProxy);
+ }
+
+
RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->add(rtpmediaservice,
mCommunicator->stringToIdentity(MediaServiceId)));
@@@ -533,12 -511,14 +576,14 @@@
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(
- management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+ mManagement->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
/* Now we can add some parameters to help find us. */
- genericparams->category = "rtp";
- mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+ rtpparams->category = "rtp";
+ mGeneralState->mServiceManagement->addLocatorParams(rtpparams, mGeneralState->mComparatorId);
}
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+
/* One must provide a component service to manage us, if someone wants to */
ComponentServicePtr ComponentService = new ComponentServicepjmediaImpl(*this, mGeneralState);
ComponentServicePrx ComponentServiceProxy =
@@@ -569,14 -549,9 +614,17 @@@ void MediaRTPpjmediaApp::stop(
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement->unregister();
+ }
+ if (mConfigurationManagement)
+ {
+ mConfigurationManagement->unregister();
+ }
+ if (!mConfigCompareGuid.empty())
+ {
+ mManagement->removeCompare(mConfigCompareGuid);
+ ServiceLocatorManagementPrx management =
+ ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
+ management->removeCompare(mGeneralState->mComparatorId);
}
mCommunicator->destroy();
}
commit 70dab79e95c3da49b6a1ea7ed8564530fd57a389
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 10:24:33 2011 -0300
Use the collocated icestorm stuff from ice-util-cpp.
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 15c7630..90b667d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -34,8 +34,6 @@ asterisk_scf_component_install(media_rtp_pjmedia)
asterisk_scf_component_init(RtpStateReplicator)
asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicatorApp.cpp)
asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicator.h)
-asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.cpp)
-asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.h)
asterisk_scf_component_add_slice(RtpStateReplicator ../local-slice/RtpStateReplicationIf.ice)
asterisk_scf_component_add_slice(RtpStateReplicator ../local-slice/RtpConfigurationIf.ice)
asterisk_scf_component_add_ice_libraries(RtpStateReplicator IceStorm)
@@ -43,4 +41,5 @@ asterisk_scf_component_add_boost_libraries(RtpStateReplicator thread date_time)
asterisk_scf_component_build_icebox(RtpStateReplicator)
target_link_libraries(RtpStateReplicator asterisk-scf-api)
target_link_libraries(RtpStateReplicator logging-client)
+target_link_libraries(RtpStateReplicator ice-util-cpp)
asterisk_scf_component_install(RtpStateReplicator)
diff --git a/src/CollocatedIceStorm.cpp b/src/CollocatedIceStorm.cpp
deleted file mode 100644
index 84dbe37..0000000
--- a/src/CollocatedIceStorm.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Asterisk SCF -- An open-source communications framework.
- *
- * Copyright (C) 2010, Digium, Inc.
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk SCF project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE.txt file
- * at the top of the source tree.
- */
-#include <Ice/Ice.h>
-#include <IceStorm/IceStorm.h>
-#include <assert.h>
-#include <algorithm>
-#include "CollocatedIceStorm.h"
-
-using namespace AsteriskSCF::Media::RTP;
-
-//
-// The idea behind this class is that it needs to access the entry point that IceBox would
-// have used and then invoke the methods that are needed to start and stop the IceStorm
-// service.
-//
-typedef IceBox::Service* (*FACTORY)(Ice::CommunicatorPtr);
-
-CollocatedIceStorm::CollocatedIceStorm(const std::string& namePrefix, const Ice::PropertiesPtr& properties) :
- mStopped(false)
-{
- //
- // We create our own communicator to avoid issues with call order on shutdown.
- //
- Ice::InitializationData initData;
- initData.properties = properties;
- mCommunicator = Ice::initialize(initData);
-
- std::string loadString = mCommunicator->getProperties()->getPropertyWithDefault("IceStorm.EntryPoint", "IceStormService:createIceStorm");
-
- mLibrary = new IceUtilInternal::DynamicLibrary;
- IceUtilInternal::DynamicLibrary::symbol_type entry = mLibrary->loadEntryPoint(loadString);
- if(entry == 0)
- {
- throw mLibrary->getErrorMessage();
- }
- FACTORY factory = (FACTORY)entry;
- mService = factory(mCommunicator);
- assert(mService != 0);
- Ice::StringSeq options;
- mService->start(namePrefix, mCommunicator, options);
-}
-
-CollocatedIceStorm::~CollocatedIceStorm()
-{
- if(!mStopped)
- {
- try
- {
- stop();
- mCommunicator->destroy();
- }
- catch(...)
- {
- }
- }
-}
-
-void CollocatedIceStorm::stop()
-{
- //
- // NOTE: there isn't any mutex protection here. It can be added later if needed, but at the moment multiple threads
- // do not have access to this object instance.
- //
- if(!mStopped)
- {
- if(mService)
- {
- mService->stop();
- }
- mCommunicator->shutdown();
- mCommunicator->waitForShutdown();
- mStopped = true;
- }
-}
diff --git a/src/CollocatedIceStorm.h b/src/CollocatedIceStorm.h
deleted file mode 100644
index 7481fa5..0000000
--- a/src/CollocatedIceStorm.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Asterisk SCF -- An open-source communications framework.
- *
- * Copyright (C) 2010, Digium, Inc.
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk SCF project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE.txt file
- * at the top of the source tree.
- */
-#pragma once
-
-#include <IceUtil/DynamicLibrary.h>
-#include <Ice/Service.h>
-#include <IceBox/IceBox.h>
-#include <Ice/Ice.h>
-#include <IceStorm/IceStorm.h>
-#include <string>
-
-namespace AsteriskSCF
-{
-namespace Media
-{
-namespace RTP
-{
-
-/**
- * A helper class that instantiates IceStorm in-process, removing the need to launch
- * a separate process to access IceStorm services.
- */
-
-class CollocatedIceStorm : public IceUtil::Shared
-{
-public:
- CollocatedIceStorm(const std::string&, const Ice::PropertiesPtr&);
- ~CollocatedIceStorm();
-
- /**
- * "nice" applications should explictly call stop !
- */
- void stop();
-
-private:
- IceUtilInternal::DynamicLibraryPtr mLibrary;
- IceBox::ServicePtr mService;
- Ice::CommunicatorPtr mCommunicator;
- bool mStopped;
-};
-
-typedef IceUtil::Handle<CollocatedIceStorm> CollocatedIceStormPtr;
-
-} /* end of RTP */
-} /* end of Media */
-} /* end of AsteriskSCF */
-
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index 123b368..a65f52b 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -25,10 +25,10 @@
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/logger.h>
+#include <AsteriskSCF/CollocatedIceStorm/CollocatedIceStorm.h>
#include "RtpConfigurationIf.h"
#include "RtpStateReplicator.h"
-#include "CollocatedIceStorm.h"
using namespace std;
using namespace AsteriskSCF::Core;
@@ -37,6 +37,7 @@ using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::RTP::V1;
using namespace AsteriskSCF::Media::RTP;
+using namespace AsteriskSCF::CollocatedIceStorm;
namespace
{
commit a9f2d2c2f87162aaeec2d1080b860e059fe31cbd
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 09:01:32 2011 -0300
Use the new service locator support in Configurator.
diff --git a/config/RtpConfigurator.py b/config/RtpConfigurator.py
index 87b3f65..56d2bff 100755
--- a/config/RtpConfigurator.py
+++ b/config/RtpConfigurator.py
@@ -47,6 +47,10 @@ class RtpSectionVisitors(Configurator.SectionVisitors):
self.groups.append(group)
+# In order to do service locator based lookup we need to pass in a params object
+serviceLocatorParams = AsteriskSCF.Media.RTP.V1.RtpConfigurationParams()
+serviceLocatorParams.category = AsteriskSCF.Media.RTP.V1.ConfigurationDiscoveryCategory
+
# Make a configurator application and let it run
-app = Configurator.ConfiguratorApp('Rtp.config', RtpSectionVisitors())
+app = Configurator.ConfiguratorApp('Rtp.config', RtpSectionVisitors(), None, serviceLocatorParams)
sys.exit(app.main(sys.argv))
commit cd8b75e4635108c0a032348c1c159441ffbbb26f
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 08:44:00 2011 -0300
Fix logic malfunction. When operating in a replica group our configuration service should always be registered.
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index a6be66a..3c397e0 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -500,6 +500,10 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
mManagement->addCompare(mConfigCompareGuid, configCompareProxy);
mConfigurationManagement->addLocatorParams(configurationParams, mConfigCompareGuid);
}
+ else if (mStateReplicator)
+ {
+ mStateReplicator->registerConfigurationService(mConfigurationServiceProxy);
+ }
if (mStateReplicator)
{
@@ -512,10 +516,6 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorListener", "no") == "yes")
{
mStateReplicator->addListener(mReplicatorListenerProxy);
- if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.Standalone", "false") == "false")
- {
- mStateReplicator->registerConfigurationService(mConfigurationServiceProxy);
- }
mReplicaService->standby();
lg(Info) << "Operating as a standby replica." << endl;
}
commit 7c62f27966769e9d75d22eb80044d055bc4dd195
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 08:40:03 2011 -0300
Only unregister the configuration service from the service locator if it was actually added.
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 97f3bbd..a6be66a 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -570,7 +570,10 @@ void MediaRTPpjmediaApp::stop()
{
mGeneralState->mServiceManagement->unregister();
}
- mConfigurationManagement->unregister();
+ if (mConfigurationManagement)
+ {
+ mConfigurationManagement->unregister();
+ }
if (!mConfigCompareGuid.empty())
{
mManagement->removeCompare(mConfigCompareGuid);
commit 590ca45f3604f215b8019bf0f8d9cc9a8f952b33
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 08:36:00 2011 -0300
Update test configuration file since new properties are required.
diff --git a/config/test_component.config b/config/test_component.config
index 97311ed..fb6e341 100644
--- a/config/test_component.config
+++ b/config/test_component.config
@@ -56,6 +56,14 @@ AsteriskSCFIceStorm.Transient=1
AsteriskSCFIceStorm.Flush.Timeout=2000
TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+RtpStateReplicatorIceStorm.InstanceName=RtpStateReplicatorIceStorm
+RtpStateReplicatorIceStorm.TopicManager.Endpoints=default -p 10005
+RtpStateReplicatorIceStorm.Publish.Endpoints=default -p 10006
+RtpStateReplicatorIceStorm.Trace.TopicManager=2
+RtpStateReplicatorIceStorm.Transient=1
+RtpStateReplicatorIceStorm.Flush.Timeout=2000
+RtpStateReplicatorTopicManager.Proxy=RtpStateReplicatorIceStorm/TopicManager:default -p 10005
+
ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
ServiceLocatorAdapter.Endpoints=tcp -p 4411
ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
commit 4740d0813de6b66cd70000b8b7c10dd883520ac6
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 08:35:44 2011 -0300
Use a separate property for the RTP IceStorm so as to not conflict.
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index 6628c8d..123b368 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -274,7 +274,7 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
mIceStorm = new CollocatedIceStorm("RtpStateReplicatorIceStorm", ic->getProperties());
IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(
- ic->propertyToProxy("TopicManager.Proxy"));
+ ic->propertyToProxy("RtpStateReplicatorTopicManager.Proxy"));
IceStorm::TopicPrx topic;
commit e04d914ecae9eb1c24a87fb18d8f93b46335c760
Author: Joshua Colp <jcolp at digium.com>
Date: Tue May 17 08:35:30 2011 -0300
I *always* screw up my assertions.
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 2254f36..ebe0206 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -144,7 +144,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const Forma
/* Create an endpoint in pjmedia for our media. */
pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- assert(status != PJ_SUCCESS);
+ assert(status == PJ_SUCCESS);
int minimumPort = configurationService->getStartPort();
int maximumPort = configurationService->getEndPort();
@@ -191,7 +191,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_fac
pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- assert(status != PJ_SUCCESS);
+ assert(status == PJ_SUCCESS);
if ((status = pjmedia_transport_udp_create2(mImpl->mEndpoint, "RTP", NULL, port, 0, &mImpl->mTransport))
!= PJ_SUCCESS)
commit 6eaa470cd582ad9231eb69aa335312a880cce9fe
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 15:44:43 2011 -0300
Update the main component so that it publishes the configuration service in standalone or registers the configuration service when running in a replica group.
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 73cc651..97f3bbd 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -26,6 +26,7 @@
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.h>
#include <AsteriskSCF/Media/MediaIf.h>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include <AsteriskSCF/System/Component/ComponentServiceIf.h>
#include <AsteriskSCF/System/Component/ReplicaIf.h>
#include <AsteriskSCF/Logger/IceLogger.h>
@@ -37,11 +38,13 @@
#include "RTPSession.h"
#include "RtpStateReplicator.h"
#include "RTPConfiguration.h"
+#include "RtpConfigurationIf.h"
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Media::V1;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::System::Configuration::V1;
using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Discovery;
@@ -233,6 +236,21 @@ private:
* An instance of the general state information class.
*/
RtpGeneralStateItemPtr mGeneralState;
+
+ /**
+ * A proxy to the service locator management service.
+ */
+ ServiceLocatorManagementPrx mManagement;
+
+ /**
+ * A proxy to the service locator manager for the configuration service.
+ */
+ ServiceManagementPrx mConfigurationManagement;
+
+ /**
+ * Unique guid for configuration service name comparator.
+ */
+ std::string mConfigCompareGuid;
};
/**
@@ -338,6 +356,28 @@ private:
};
/**
+ * Comparator implementation for name based configuration service locating
+ */
+class RtpConfigurationCompare : public ServiceLocatorParamsCompare
+{
+public:
+ RtpConfigurationCompare(const string& name) : mName(name) {}
+ bool isSupported(const ServiceLocatorParamsPtr ¶ms, const Ice::Current &)
+ {
+ RtpConfigurationParamsPtr configParams = RtpConfigurationParamsPtr::dynamicCast(params);
+ if (configParams->name == mName)
+ {
+ return true;
+ }
+ return false;
+ }
+private:
+ string mName;
+};
+
+typedef IceUtil::Handle<RtpConfigurationCompare> RtpConfigurationComparePtr;
+
+/**
* Constructor for the RTPMediaServiceImpl class.
*/
RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
@@ -405,7 +445,8 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
mConfigurationService = new ConfigurationServiceImpl();
- mLocalAdapter->add(mConfigurationService, mCommunicator->stringToIdentity(ConfigurationServiceId));
+ ConfigurationServicePrx mConfigurationServiceProxy = ConfigurationServicePrx::uncheckedCast(
+ mLocalAdapter->add(mConfigurationService, mCommunicator->stringToIdentity(ConfigurationServiceId)));
mLocalAdapter->activate();
@@ -415,8 +456,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
lg(Info) << "Activated pjmedia rtp component media service." << endl;
- ServiceLocatorManagementPrx management =
- ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
+ mManagement = ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
// The service locator is required for state replicator operation, so go ahead and find it
ServiceLocatorPrx locator = ServiceLocatorPrx::checkedCast(mCommunicator->propertyToProxy("LocatorService.Proxy"));
@@ -440,6 +480,27 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
RTPMediaServiceImplPtr rtpmediaservice =
new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator, mConfigurationService);
+ if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.Standalone", "false") == "true")
+ {
+ // Publish the configuration service IceStorm topic so everybody gets configuration
+ mConfigurationManagement = ServiceManagementPrx::uncheckedCast(
+ mManagement->addService(mConfigurationServiceProxy, ""));
+
+ // Populate the configuration parameters with details so we can be found
+ RtpConfigurationParamsPtr configurationParams = new RtpConfigurationParams();
+ configurationParams->category = ConfigurationDiscoveryCategory;
+ configurationParams->name = mCommunicator->getProperties()->getPropertyWithDefault("RtpConfiguration.Name", "");
+
+ // Add our custom comparator so we can support multiple simultaneous configuration sinks
+ RtpConfigurationComparePtr configNameCompare = new RtpConfigurationCompare(configurationParams->name);
+ ServiceLocatorParamsComparePrx configCompareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mLocalAdapter->addWithUUID(configNameCompare));
+
+ mConfigCompareGuid = IceUtil::generateUUID();
+ mManagement->addCompare(mConfigCompareGuid, configCompareProxy);
+ mConfigurationManagement->addLocatorParams(configurationParams, mConfigCompareGuid);
+ }
+
if (mStateReplicator)
{
mReplicatorListener =
@@ -451,6 +512,10 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.StateReplicatorListener", "no") == "yes")
{
mStateReplicator->addListener(mReplicatorListenerProxy);
+ if (mCommunicator->getProperties()->getPropertyWithDefault("Rtp.Standalone", "false") == "false")
+ {
+ mStateReplicator->registerConfigurationService(mConfigurationServiceProxy);
+ }
mReplicaService->standby();
lg(Info) << "Operating as a standby replica." << endl;
}
@@ -468,7 +533,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(
- management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+ mManagement->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
/* Now we can add some parameters to help find us. */
genericparams->category = "rtp";
mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
@@ -481,7 +546,7 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
/* Let's add the component service to the service locator first */
mComponentServiceManagement =
- ServiceManagementPrx::uncheckedCast(management->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
+ ServiceManagementPrx::uncheckedCast(mManagement->addService(ComponentServiceProxy, "media_rtp_pjmedia"));
genericparams->category = "Component/media_rtp_pjmedia";
mComponentServiceManagement->addLocatorParams(genericparams, "");
@@ -503,7 +568,12 @@ void MediaRTPpjmediaApp::stop()
mComponentServiceManagement->unregister();
if (mReplicaService->isActive() == true)
{
- mGeneralState-> mServiceManagement->unregister();
+ mGeneralState->mServiceManagement->unregister();
+ }
+ mConfigurationManagement->unregister();
+ if (!mConfigCompareGuid.empty())
+ {
+ mManagement->removeCompare(mConfigCompareGuid);
}
mCommunicator->destroy();
}
commit 366fae18463169b226613285ba70842374c718ab
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 14:28:21 2011 -0300
Tada! The state replicator will now add the publisher for the IceStorm topic to the service locator.
diff --git a/local-slice/RtpConfigurationIf.ice b/local-slice/RtpConfigurationIf.ice
index 45bbd59..09fc822 100644
--- a/local-slice/RtpConfigurationIf.ice
+++ b/local-slice/RtpConfigurationIf.ice
@@ -15,7 +15,9 @@
*/
#pragma once
+
#include <Ice/BuiltinSequences.ice>
+#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
#include <AsteriskSCF/System/Component/ConfigurationIf.ice>
module AsteriskSCF
@@ -31,6 +33,22 @@ module RTP
module V1
{
/**
+ * Service locator category for finding the configuration service
+ */
+ const string ConfigurationDiscoveryCategory = "RtpConfiguration";
+
+ /**
+ * Service locator parameters class for discovering the configuration service
+ */
+ unsliceable class RtpConfigurationParams extends AsteriskSCF::Core::Discovery::V1::ServiceLocatorParams
+ {
+ /**
+ * Unique name for the configuration service
+ */
+ string name;
+ };
+
+ /**
* Local visitor class for visiting RTP configuration groups
*/
local class RtpConfigurationGroupVisitor extends AsteriskSCF::System::Configuration::V1::ConfigurationGroupVisitor
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 268aef1..15c7630 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -37,6 +37,7 @@ asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicator.h)
asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.cpp)
asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.h)
asterisk_scf_component_add_slice(RtpStateReplicator ../local-slice/RtpStateReplicationIf.ice)
+asterisk_scf_component_add_slice(RtpStateReplicator ../local-slice/RtpConfigurationIf.ice)
asterisk_scf_component_add_ice_libraries(RtpStateReplicator IceStorm)
asterisk_scf_component_add_boost_libraries(RtpStateReplicator thread date_time)
asterisk_scf_component_build_icebox(RtpStateReplicator)
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index 3ceeb35..6628c8d 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -26,6 +26,7 @@
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/logger.h>
+#include "RtpConfigurationIf.h"
#include "RtpStateReplicator.h"
#include "CollocatedIceStorm.h"
@@ -67,7 +68,9 @@ private:
ComponentServicePtr mComponentService;
RtpStateReplicatorIPtr mStateReplicator;
CollocatedIceStormPtr mIceStorm;
- IceStorm::TopicPrx mConfigurationReplicationTopic;
+ Ice::ObjectPrx mConfigurationPublisher;
+ Discovery::V1::ServiceManagementPrx mConfigurationManagement;
+ std::string mConfigCompareGuid;
};
static const string ComponentServiceId("RtpStateReplicatorComponent");
@@ -121,10 +124,29 @@ private:
typedef IceUtil::Handle<RtpStateReplicatorCompare> RtpStateReplicatorComparePtr;
+class RtpConfigurationCompare : public ServiceLocatorParamsCompare
+{
+public:
+ RtpConfigurationCompare(const string& name) : mName(name) {}
+ bool isSupported(const ServiceLocatorParamsPtr ¶ms, const Ice::Current &)
+ {
+ RtpConfigurationParamsPtr configParams = RtpConfigurationParamsPtr::dynamicCast(params);
+ if (configParams->name == mName)
+ {
+ return true;
+ }
+ return false;
+ }
+private:
+ string mName;
+};
+
+typedef IceUtil::Handle<RtpConfigurationCompare> RtpConfigurationComparePtr;
+
class RtpStateReplicatorConfigI : public RtpStateReplicatorI
{
public:
- RtpStateReplicatorConfigI(IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
+ RtpStateReplicatorConfigI(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
void registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
private:
IceStorm::TopicPrx mConfigurationReplicationTopic;
@@ -202,6 +224,24 @@ void RtpStateReplicatorService::registerWithServiceLocator(const Ice::Communicat
mServiceLocatorManagement->addCompare(compareGuid, compareProxy);
mStateReplicationManagement->addLocatorParams(discoveryParams, compareGuid);
+ // Publish the configuration service IceStorm topic so everybody gets configuration
+ mConfigurationManagement = ServiceManagementPrx::uncheckedCast(
+ mServiceLocatorManagement->addService(mConfigurationPublisher, ""));
+
+ // Populate the configuration parameters with details so we can be found
+ RtpConfigurationParamsPtr configurationParams = new RtpConfigurationParams();
+ configurationParams->category = ConfigurationDiscoveryCategory;
+ configurationParams->name = ic->getProperties()->getPropertyWithDefault("RtpConfiguration.Name", "");
+
+ // Add our custom comparator so we can support multiple simultaneous configuration sinks
+ RtpConfigurationComparePtr configNameCompare = new RtpConfigurationCompare(configurationParams->name);
+ ServiceLocatorParamsComparePrx configCompareProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mAdapter->addWithUUID(configNameCompare));
+
+ mConfigCompareGuid = IceUtil::generateUUID();
+ mServiceLocatorManagement->addCompare(mConfigCompareGuid, configCompareProxy);
+ mConfigurationManagement->addLocatorParams(configurationParams, mConfigCompareGuid);
+
// TBD... We may have other interfaces to publish to the Service Locator.
}
catch(...)
@@ -220,6 +260,8 @@ void RtpStateReplicatorService::deregisterFromServiceLocator()
try
{
mComponentServiceManagement->unregister();
+ mConfigurationManagement->unregister();
+ mServiceLocatorManagement->removeCompare(mConfigCompareGuid);
}
catch(...)
{
@@ -240,7 +282,7 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
{
try
{
- topic = topicManager->retrieve("ConfigurationReplication");
+ topic = topicManager->retrieve("ConfigurationReplication");
}
catch (const IceStorm::NoSuchTopic&)
{
@@ -254,6 +296,9 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
return;
}
}
+ // There is no cast here on purpose as this is just going to get passed to
+ // the service locator which just takes a plain ol' proxy anyway.
+ mConfigurationPublisher = topic->getPublisher();
}
else
{
commit c85c80143e620332470b3d82df17f4d9e65140b1
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 14:00:29 2011 -0300
Add ability to register configuration service with the state replicator.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 97db712..3d2b7c7 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -22,8 +22,7 @@
#include <AsteriskSCF/Media/MediaIf.ice>
#include <AsteriskSCF/Media/RTP/MediaRTPIf.ice>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
-
-#include "RtpConfigurationIf.ice"
+#include <AsteriskSCF/System/Component/ConfigurationIf.ice>
module AsteriskSCF
{
@@ -71,6 +70,7 @@ module V1
void removeState(Ice::StringSeq items);
idempotent RtpStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RtpStateItemSeq getAllState();
+ void registerConfigurationService(AsteriskSCF::System::Configuration::V1::ConfigurationService *service);
};
class RtpGeneralStateItem extends RtpStateItem
diff --git a/src/CollocatedIceStorm.cpp b/src/CollocatedIceStorm.cpp
new file mode 100644
index 0000000..84dbe37
--- /dev/null
+++ b/src/CollocatedIceStorm.cpp
@@ -0,0 +1,87 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <assert.h>
+#include <algorithm>
+#include "CollocatedIceStorm.h"
+
+using namespace AsteriskSCF::Media::RTP;
+
+//
+// The idea behind this class is that it needs to access the entry point that IceBox would
+// have used and then invoke the methods that are needed to start and stop the IceStorm
+// service.
+//
+typedef IceBox::Service* (*FACTORY)(Ice::CommunicatorPtr);
+
+CollocatedIceStorm::CollocatedIceStorm(const std::string& namePrefix, const Ice::PropertiesPtr& properties) :
+ mStopped(false)
+{
+ //
+ // We create our own communicator to avoid issues with call order on shutdown.
+ //
+ Ice::InitializationData initData;
+ initData.properties = properties;
+ mCommunicator = Ice::initialize(initData);
+
+ std::string loadString = mCommunicator->getProperties()->getPropertyWithDefault("IceStorm.EntryPoint", "IceStormService:createIceStorm");
+
+ mLibrary = new IceUtilInternal::DynamicLibrary;
+ IceUtilInternal::DynamicLibrary::symbol_type entry = mLibrary->loadEntryPoint(loadString);
+ if(entry == 0)
+ {
+ throw mLibrary->getErrorMessage();
+ }
+ FACTORY factory = (FACTORY)entry;
+ mService = factory(mCommunicator);
+ assert(mService != 0);
+ Ice::StringSeq options;
+ mService->start(namePrefix, mCommunicator, options);
+}
+
+CollocatedIceStorm::~CollocatedIceStorm()
+{
+ if(!mStopped)
+ {
+ try
+ {
+ stop();
+ mCommunicator->destroy();
+ }
+ catch(...)
+ {
+ }
+ }
+}
+
+void CollocatedIceStorm::stop()
+{
+ //
+ // NOTE: there isn't any mutex protection here. It can be added later if needed, but at the moment multiple threads
+ // do not have access to this object instance.
+ //
+ if(!mStopped)
+ {
+ if(mService)
+ {
+ mService->stop();
+ }
+ mCommunicator->shutdown();
+ mCommunicator->waitForShutdown();
+ mStopped = true;
+ }
+}
diff --git a/src/CollocatedIceStorm.h b/src/CollocatedIceStorm.h
new file mode 100644
index 0000000..7481fa5
--- /dev/null
+++ b/src/CollocatedIceStorm.h
@@ -0,0 +1,60 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2010, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <IceUtil/DynamicLibrary.h>
+#include <Ice/Service.h>
+#include <IceBox/IceBox.h>
+#include <Ice/Ice.h>
+#include <IceStorm/IceStorm.h>
+#include <string>
+
+namespace AsteriskSCF
+{
+namespace Media
+{
+namespace RTP
+{
+
+/**
+ * A helper class that instantiates IceStorm in-process, removing the need to launch
+ * a separate process to access IceStorm services.
+ */
+
+class CollocatedIceStorm : public IceUtil::Shared
+{
+public:
+ CollocatedIceStorm(const std::string&, const Ice::PropertiesPtr&);
+ ~CollocatedIceStorm();
+
+ /**
+ * "nice" applications should explictly call stop !
+ */
+ void stop();
+
+private:
+ IceUtilInternal::DynamicLibraryPtr mLibrary;
+ IceBox::ServicePtr mService;
+ Ice::CommunicatorPtr mCommunicator;
+ bool mStopped;
+};
+
+typedef IceUtil::Handle<CollocatedIceStorm> CollocatedIceStormPtr;
+
+} /* end of RTP */
+} /* end of Media */
+} /* end of AsteriskSCF */
+
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index e4d6df9..3ceeb35 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -121,6 +121,34 @@ private:
typedef IceUtil::Handle<RtpStateReplicatorCompare> RtpStateReplicatorComparePtr;
+class RtpStateReplicatorConfigI : public RtpStateReplicatorI
+{
+public:
+ RtpStateReplicatorConfigI(IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
+ void registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
+private:
+ IceStorm::TopicPrx mConfigurationReplicationTopic;
+};
+
+void RtpStateReplicatorConfigI::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
+{
+ if (mConfigurationReplicationTopic)
+ {
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+
+ try
+ {
+ mConfigurationReplicationTopic->subscribeAndGetPublisher(qos, service);
+ }
+ catch (const IceStorm::AlreadySubscribed&)
+ {
+ // This is perfectly okay actually, it just means what they wanted us to do
+ // is already done.
+ }
+ }
+}
+
/**
* Register this component's primary public interfaces with the Service Locator.
* This enables other Asterisk SCF components to locate our interfaces.
@@ -206,17 +234,19 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(
ic->propertyToProxy("TopicManager.Proxy"));
+ IceStorm::TopicPrx topic;
+
if (topicManager)
{
try
{
- mConfigurationReplicationTopic = topicManager->retrieve("ConfigurationReplication");
+ topic = topicManager->retrieve("ConfigurationReplication");
}
catch (const IceStorm::NoSuchTopic&)
{
try
{
- mConfigurationReplicationTopic = topicManager->create("ConfigurationReplication");
+ topic = topicManager->create("ConfigurationReplication");
}
catch (const IceStorm::TopicExists&)
{
@@ -240,7 +270,7 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
// Create and publish our ComponentService interface support.
mComponentService = new ComponentServiceImpl(*this);
mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
- mStateReplicator = new RtpStateReplicatorI();
+ mStateReplicator = new RtpStateReplicatorConfigI(topic);
mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
mAdapter->activate();
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c13ab44..c82fb29 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -5,7 +5,6 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
asterisk_scf_component_init(media_rtp_pjmedia_test)
asterisk_scf_component_add_file(media_rtp_pjmedia_test TestRTPpjmedia.cpp)
asterisk_scf_component_add_slice(media_rtp_pjmedia_test ../local-slice/RtpStateReplicationIf.ice)
-asterisk_scf_component_add_slice(media_rtp_pjmedia_test ../local-slice/RtpConfigurationIf.ice)
asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test unit_test_framework thread date_time)
asterisk_scf_component_build_icebox(media_rtp_pjmedia_test)
target_link_libraries(media_rtp_pjmedia_test asterisk-scf-api)
commit f27572008be8a6a37d8aaee4e8a7b29fb8e7339c
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 13:25:05 2011 -0300
Fix build issue.
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c82fb29..c13ab44 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -5,6 +5,7 @@ asterisk_scf_slice_include_directories(${API_SLICE_DIR})
asterisk_scf_component_init(media_rtp_pjmedia_test)
asterisk_scf_component_add_file(media_rtp_pjmedia_test TestRTPpjmedia.cpp)
asterisk_scf_component_add_slice(media_rtp_pjmedia_test ../local-slice/RtpStateReplicationIf.ice)
+asterisk_scf_component_add_slice(media_rtp_pjmedia_test ../local-slice/RtpConfigurationIf.ice)
asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test unit_test_framework thread date_time)
asterisk_scf_component_build_icebox(media_rtp_pjmedia_test)
target_link_libraries(media_rtp_pjmedia_test asterisk-scf-api)
commit d418b0865dc86ba0d4a4f522a05ae685d77fb14f
Merge: 014b586 d0f0dfa
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 13:23:54 2011 -0300
Merge branch 'master' into configuration-replication
Conflicts:
src/MediaRTPpjmedia.cpp
src/RTPSession.cpp
src/RTPSession.h
src/RtpStateReplicator.h
src/RtpStateReplicatorListener.cpp
diff --cc src/MediaRTPpjmedia.cpp
index c21e9d5,ab63f28..73cc651
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@@ -62,8 -60,7 +62,8 @@@ class RTPMediaServiceImpl : public RTPM
{
public:
RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
- const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>&,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&);
++ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr&);
RTPSessionPrx allocate(const FormatSeq&, const Ice::Current&);
pj_pool_factory *getPoolFactory() { return &mCachingPool.factory; };
private:
@@@ -88,14 -85,9 +88,14 @@@
ReplicaPtr mReplicaService;
/**
+ * A pointer to the configuration service.
+ */
+ ConfigurationServiceImplPtr mConfigurationService;
+
+ /**
* A proxy to the state replicator.
*/
- AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
+ AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
};
/**
@@@ -341,10 -328,8 +341,10 @@@ private
* Constructor for the RTPMediaServiceImpl class.
*/
RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
- const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
- mAdapter(adapter), mReplicaService(replicaService), mStateReplicator(stateReplicator)
++ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+ const ConfigurationServiceImplPtr& configurationService) :
+ mAdapter(adapter), mReplicaService(replicaService), mConfigurationService(configurationService),
+ mStateReplicator(stateReplicator)
{
/* Initialize the memory caching pool using default policy as specified by pjlib. */
pj_caching_pool_init(&mCachingPool, &pj_pool_factory_default_policy, 0);
diff --cc src/RTPSession.cpp
index 41df24f,2cddb39..2254f36
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@@ -57,8 -54,8 +56,8 @@@ class RTPSessionImplPri
{
public:
RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
- const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
+ const ReplicaPtr& replicaService,
- const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
++ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
mAdapter(adapter), mFormats(formats),
mSessionStateItem(new RtpSessionStateItem()),
mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
@@@ -134,9 -131,8 +133,9 @@@
* Constructor for the RTPSessionImpl class (used by Ice).
*/
RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
- pj_pool_factory* factory, const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
+ pj_pool_factory* factory, const ReplicaPtr& replicaService,
- const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
++ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+ const ConfigurationServiceImplPtr& configurationService) :
mImpl(new RTPSessionImplPriv(adapter, formats, replicaService, stateReplicator))
{
/* Add ourselves to the ICE ASM so we can be used. */
@@@ -184,9 -180,9 +183,9 @@@
* Constructor for the RTPSessionImpl class (used by state replicator).
*/
RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory* factory,
- const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
- Ice::Int port, const FormatSeq& formats) :
+ const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
+ Ice::Int port, const FormatSeq& formats, const ConfigurationServiceImplPtr& configurationService) :
- mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>)))
+ mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>)))
{
mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
diff --cc src/RTPSession.h
index 46d596d,1703cd0..d783249
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@@ -9,9 -9,8 +9,10 @@@
#pragma once
#include <boost/shared_ptr.hpp>
+ #include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "RTPConfiguration.h"
+
/**
* Forward definition for our private implementation of RTPSession.
*/
@@@ -44,13 -43,10 +45,13 @@@ class RTPSessionImpl : public AsteriskS
{
public:
RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::V1::FormatSeq&,
- pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&,
- const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&);
+ pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&,
- const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
++ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr&
+ );
RTPSessionImpl(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const Ice::Identity&, const Ice::Identity&,
- const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&);
+ const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&,
+ const ConfigurationServiceImplPtr&);
AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
diff --cc src/RtpStateReplicator.h
index 3e04e0c,4ab9e64..62b4940
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@@ -18,26 -18,30 +18,33 @@@
#include <Ice/Ice.h>
- #include <AsteriskSCF/StateReplicator.h>
-
+ #include <AsteriskSCF/Replication/StateReplicator.h>
#include "RtpStateReplicationIf.h"
+#include "RTPConfiguration.h"
+
- using namespace AsteriskSCF::Media::RTP::V1;
+ #include <boost/shared_ptr.hpp>
+
+ typedef AsteriskSCF::Replication::StateReplicator<
+ AsteriskSCF::Media::RTP::V1::RtpStateReplicator,
+ AsteriskSCF::Media::RTP::V1::RtpStateItemPtr,
+ std::string, AsteriskSCF::Media::RTP::V1::RtpStateReplicatorListenerPrx> RtpStateReplicatorI;
- typedef AsteriskSCF::StateReplication::StateReplicator<RtpStateReplicator, RtpStateItemPtr, std::string,
- RtpStateReplicatorListenerPrx> RtpStateReplicatorI;
typedef IceUtil::Handle<RtpStateReplicatorI> RtpStateReplicatorIPtr;
- class RtpStateReplicatorListenerI : public RtpStateReplicatorListener
+ //
+ // Forward declaration.
+ //
+ struct RtpStateReplicatorListenerImpl;
+
+ class RtpStateReplicatorListenerI : public AsteriskSCF::Media::RTP::V1::RtpStateReplicatorListener
{
public:
- RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const RtpGeneralStateItemPtr&,
- RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*,
- const AsteriskSCF::Media::RTP::V1::RtpGeneralStateItemPtr&);
++ RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const AsteriskSCF::Media::RTP::V1::RtpGeneralStateItemPtr&,
+ const ConfigurationServiceImplPtr&);
+ ~RtpStateReplicatorListenerI();
void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
- void stateSet(const RtpStateItemSeq&, const Ice::Current&);
+ void stateSet(const AsteriskSCF::Media::RTP::V1::RtpStateItemSeq&, const Ice::Current&);
bool operator==(const RtpStateReplicatorListenerI &rhs);
private:
- struct RtpStateReplicatorListenerImpl *mImpl;
+ boost::shared_ptr<RtpStateReplicatorListenerImpl> mImpl;
};
diff --cc src/RtpStateReplicatorListener.cpp
index d610af8,fd5694c..c4e78b9
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@@ -152,13 -146,9 +150,12 @@@ public
};
RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr& adapter,
- pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState)
- : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState))
+ pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState,
+ const ConfigurationServiceImplPtr& configurationService)
+ : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState, configurationService)) {}
+
+RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
{
- delete mImpl;
}
void RtpStateReplicatorListenerI::stateRemoved(const Ice::StringSeq& itemKeys, const Ice::Current&)
commit 014b58607f05d3c99be3e5edf26a81545a04ed52
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 16 13:17:24 2011 -0300
Add current progress for configuration replication.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 254b50e..97db712 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -23,6 +23,8 @@
#include <AsteriskSCF/Media/RTP/MediaRTPIf.ice>
#include <AsteriskSCF/Core/Discovery/ServiceLocatorIf.ice>
+#include "RtpConfigurationIf.ice"
+
module AsteriskSCF
{
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 175d1b2..21931a7 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -35,6 +35,8 @@ asterisk_scf_component_install(media_rtp_pjmedia)
asterisk_scf_component_init(RtpStateReplicator)
asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicatorApp.cpp)
asterisk_scf_component_add_file(RtpStateReplicator RtpStateReplicator.h)
+asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.cpp)
+asterisk_scf_component_add_file(RtpStateReplicator CollocatedIceStorm.h)
asterisk_scf_component_add_slice(RtpStateReplicator ../local-slice/RtpStateReplicationIf.ice)
asterisk_scf_component_add_ice_libraries(RtpStateReplicator IceStorm)
asterisk_scf_component_add_boost_libraries(RtpStateReplicator thread date_time)
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index 78e8ae1..f91fa18 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -27,6 +27,7 @@
#include <AsteriskSCF/logger.h>
#include "RtpStateReplicator.h"
+#include "CollocatedIceStorm.h"
using namespace std;
using namespace AsteriskSCF::Core;
@@ -34,6 +35,7 @@ using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::System::Component::V1;
using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::RTP::V1;
+using namespace AsteriskSCF::Media::RTP;
namespace
{
@@ -64,6 +66,8 @@ private:
ConfiguredIceLoggerPtr mIceLogger;
ComponentServicePtr mComponentService;
RtpStateReplicatorIPtr mStateReplicator;
+ CollocatedIceStormPtr mIceStorm;
+ IceStorm::TopicPrx mConfigurationReplicationTopic;
};
static const string ComponentServiceId("RtpStateReplicatorComponent");
@@ -197,6 +201,35 @@ void RtpStateReplicatorService::deregisterFromServiceLocator()
void RtpStateReplicatorService::initialize(const string& appName, const Ice::CommunicatorPtr& ic)
{
+ mIceStorm = new CollocatedIceStorm("RtpStateReplicatorIceStorm", ic->getProperties());
+
+ IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(
+ ic->propertyToProxy("TopicManager.Proxy"));
+
+ if (topicManager)
+ {
+ try
+ {
+ mConfigurationReplicationTopic = topicManager->retrieve("ConfigurationReplication");
+ }
+ catch (const IceStorm::NoSuchTopic&)
+ {
+ try
+ {
+ mConfigurationReplicationTopic = topicManager->create("ConfigurationReplication");
+ }
+ catch (const IceStorm::TopicExists&)
+ {
+ lg(Error) << "Oh snap! Race condition creating topic, aborting";
+ return;
+ }
+ }
+ }
+ else
+ {
+ lg(Info) << "IceStorm topic manager proxy not present, unable to perform configuration replication.";
+ }
+
mAdapter = ic->createObjectAdapter("RtpStateReplicator");
// setup logging client
@@ -225,6 +258,9 @@ void RtpStateReplicatorService::stop()
{
// Remove our interfaces from the service locator.
deregisterFromServiceLocator();
+
+ // Stop our local IceStorm instance
+ mIceStorm->stop();
}
extern "C"
commit 8c950ee6637816c1a07b9864a8a9fdb627cb632c
Author: Joshua Colp <jcolp at digium.com>
Date: Wed May 11 08:45:03 2011 -0300
Incorporate feedback from code review.
diff --git a/config/RtpConfigurator.py b/config/RtpConfigurator.py
index 983144d..87b3f65 100755
--- a/config/RtpConfigurator.py
+++ b/config/RtpConfigurator.py
@@ -34,11 +34,11 @@ class RtpSectionVisitors(Configurator.SectionVisitors):
mapper = Configurator.OptionMapper()
portsItem = AsteriskSCF.Media.RTP.V1.PortRangesItem()
- mapper.map('startport', portsItem, 'startPort', 'ports', config.getint, 10000)
- mapper.map('endport', portsItem, 'endPort', 'ports', config.getint, 20000)
+ mapper.map('startport', portsItem, 'startPort', AsteriskSCF.Media.RTP.V1.PortRangesItemName, config.getint, 10000)
+ mapper.map('endport', portsItem, 'endPort', AsteriskSCF.Media.RTP.V1.PortRangesItemName, config.getint, 20000)
workerItem = AsteriskSCF.Media.RTP.V1.WorkerThreadCountItem()
- mapper.map('workerthreadcount', workerItem, 'count', 'workerThreadCount', config.getint, 4)
+ mapper.map('workerthreadcount', workerItem, 'count', AsteriskSCF.Media.RTP.V1.WorkerThreadCountItemName, config.getint, 4)
for option in config.options(section):
mapper.execute(group, section, option)
diff --git a/local-slice/RtpConfigurationIf.ice b/local-slice/RtpConfigurationIf.ice
index c04c07b..45bbd59 100644
--- a/local-slice/RtpConfigurationIf.ice
+++ b/local-slice/RtpConfigurationIf.ice
@@ -66,9 +66,15 @@ module V1
};
/**
- * Port ranges configuartion item
+ * Name that the port ranges configuration item should be inserted as
+ */
+ const string PortRangesItemName = "ports";
+
+ /**
+ * Port ranges configuration item
*
- * This must be added to the general configuration group using the name ports
+ * This must be added to the general configuration group using the constant string
+ * in PortRangesItemName
*
*/
class PortRangesItem extends RtpConfigurationItem
@@ -91,9 +97,15 @@ module V1
};
/**
+ * Name that the worker thread count configuration item should be inserted as
+ */
+ const string WorkerThreadCountItemName = "workerThreadCount";
+
+ /**
* Worker thread count for incoming media configuration item
*
- * This must be added to the general configuration group using the name workerThreadCount
+ * This must be added to the general configuration group using the constant string
+ * in WorkerThreadCountItemName
*
*/
class WorkerThreadCountItem extends RtpConfigurationItem
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
index bac536f..1fef9f5 100644
--- a/src/RTPConfiguration.cpp
+++ b/src/RTPConfiguration.cpp
@@ -18,6 +18,7 @@
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
#include <AsteriskSCF/System/Component/ConfigurationIf.h>
@@ -34,6 +35,11 @@ public:
* General RTP configuration
*/
RtpGeneralGroupPtr mGeneralGroup;
+
+ /**
+ * Shared mutex lock which protects the configuration
+ */
+ boost::shared_mutex mLock;
};
ConfigurationServiceImpl::ConfigurationServiceImpl() : mImplPriv(new ConfigurationServiceImplPriv())
@@ -42,59 +48,59 @@ ConfigurationServiceImpl::ConfigurationServiceImpl() : mImplPriv(new Configurati
ConfigurationGroupSeq ConfigurationServiceImpl::getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
- class visitor : public RtpConfigurationGroupVisitor
+ class GroupVisitor : public RtpConfigurationGroupVisitor
{
public:
- visitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) : mImplPriv(implPriv), mGroups(visitorGroups) { };
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) : mImplPriv(implPriv), mGroups(visitorGroups) { };
private:
- /**
- * Internal helper function which determines what configuration items should be returned
- */
- void insertRequestedConfigurationItems(ConfigurationItemDict& requestedItems,
- ConfigurationItemDict& localItems,
- ConfigurationItemDict& returnedItems)
- {
-
- for (ConfigurationItemDict::iterator requestedItem = requestedItems.begin();
- requestedItem != requestedItems.end();
- ++requestedItem)
- {
- ConfigurationItemDict::iterator localItem = localItems.find((*requestedItem).first);
-
- if (localItem == localItems.end())
- {
- continue;
- }
-
- returnedItems.insert(make_pair((*requestedItem).first, (*requestedItem).second));
- }
- }
-
- void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
- {
- if (!mImplPriv->mGeneralGroup)
- {
- return;
- }
-
- RtpGeneralGroupPtr returnedGroup = new RtpGeneralGroup();
-
- insertRequestedConfigurationItems(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, returnedGroup->configurationItems);
-
- mGroups.push_back(returnedGroup);
- };
-
- boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
- ConfigurationGroupSeq& mGroups;
+ /**
+ * Internal helper function which determines what configuration items should be returned
+ */
+ void insertRequestedConfigurationItems(const ConfigurationItemDict& requestedItems,
+ const ConfigurationItemDict& localItems,
+ ConfigurationItemDict& returnedItems)
+ {
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationItemDict::const_iterator requestedItem = requestedItems.cbegin();
+ requestedItem != requestedItems.end();
+ ++requestedItem)
+ {
+ ConfigurationItemDict::const_iterator localItem = localItems.find(requestedItem->first);
+
+ if (localItem != localItems.end())
+ {
+ returnedItems.insert((*requestedItem));
+ }
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ RtpGeneralGroupPtr returnedGroup = new RtpGeneralGroup();
+
+ insertRequestedConfigurationItems(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, returnedGroup->configurationItems);
+
+ mGroups.push_back(returnedGroup);
+ };
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ ConfigurationGroupSeq& mGroups;
};
ConfigurationGroupSeq newGroups;
- RtpConfigurationGroupVisitorPtr v = new visitor(mImplPriv, newGroups);
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv, newGroups);
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
- (*group)->visit(v);
+ (*group)->visit(v);
}
return newGroups;
@@ -102,33 +108,35 @@ ConfigurationGroupSeq ConfigurationServiceImpl::getConfiguration(const AsteriskS
ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
- class visitor : public RtpConfigurationGroupVisitor
+ class GroupVisitor : public RtpConfigurationGroupVisitor
{
public:
- visitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) :
- mImplPriv(implPriv), mGroups(visitorGroups) { };
-
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) :
+ mImplPriv(implPriv), mGroups(visitorGroups) { };
+
private:
- void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
- {
- if (!mImplPriv->mGeneralGroup)
- {
- return;
- }
-
- mGroups.push_back(mImplPriv->mGeneralGroup);
- };
-
- boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
- ConfigurationGroupSeq& mGroups;
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ mGroups.push_back(mImplPriv->mGeneralGroup);
+ };
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ ConfigurationGroupSeq& mGroups;
};
ConfigurationGroupSeq newGroups;
- RtpConfigurationGroupVisitorPtr v = new visitor(mImplPriv, newGroups);
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv, newGroups);
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
- (*group)->visit(v);
+ (*group)->visit(v);
}
return newGroups;
@@ -137,11 +145,13 @@ ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationAll(const Asteri
ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationGroups(const Ice::Current&)
{
ConfigurationGroupSeq groups;
-
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
if (mImplPriv->mGeneralGroup)
{
- RtpGeneralGroupPtr general = new RtpGeneralGroup();
- groups.push_back(general);
+ RtpGeneralGroupPtr general = new RtpGeneralGroup();
+ groups.push_back(general);
}
return groups;
@@ -149,147 +159,155 @@ ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationGroups(const Ice
void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
- class groupsVisitor : public RtpConfigurationGroupVisitor
+ class GroupVisitor : public RtpConfigurationGroupVisitor
{
public:
- groupsVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
-
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+
private:
- /**
- * Helper function which performs serial number checking of items
- */
- void performSerialCheck(ConfigurationItemDict& changedItems, ConfigurationItemDict& localItems)
- {
- for (ConfigurationItemDict::iterator item = changedItems.begin();
- item != changedItems.end();
- ++item)
- {
- // If serial checking is to be skipped for this item just skip over it
- if ((*item).second->serialNumber == -1)
- {
- continue;
- }
-
- ConfigurationItemDict::iterator localItem = localItems.find((*item).first);
-
- if (localItem == localItems.end())
- {
- // This is a new item so serial checking does not apply
- continue;
- }
-
- if ((*item).second->serialNumber < (*localItem).second->serialNumber)
- {
- /* XXX Need to throw the exception */
- }
- }
- }
-
- void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
- {
- if (!mImplPriv->mGeneralGroup)
- {
- mImplPriv->mGeneralGroup = new RtpGeneralGroup();
- }
- else
- {
- performSerialCheck(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems);
- }
-
- for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
- item != group->configurationItems.end();
- ++item)
- {
- mImplPriv->mGeneralGroup->configurationItems.insert(make_pair((*item).first, (*item).second));
- }
- }
-
- boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ /**
+ * Helper function which performs serial number checking of items
+ */
+ void performSerialCheck(const ConfigurationItemDict& changedItems, const ConfigurationItemDict& localItems,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupPtr& group)
+ {
+ for (ConfigurationItemDict::const_iterator item = changedItems.cbegin();
+ item != changedItems.end();
+ ++item)
+ {
+ // If serial checking is to be skipped for this item just skip over it
+ if (item->second->serialNumber == -1)
+ {
+ continue;
+ }
+
+ ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
+
+ if (localItem == localItems.end())
+ {
+ // This is a new item so serial checking does not apply
+ continue;
+ }
+
+ if (item->second->serialNumber < localItem->second->serialNumber)
+ {
+ throw SerialConflict(group, item->second);
+ }
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ mImplPriv->mGeneralGroup = new RtpGeneralGroup();
+ }
+ else
+ {
+ performSerialCheck(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, group);
+ }
+
+ for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
+ item != group->configurationItems.end();
+ ++item)
+ {
+ mImplPriv->mGeneralGroup->configurationItems.erase(item->first);
+ mImplPriv->mGeneralGroup->configurationItems.insert((*item));
+ }
+ }
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
};
- RtpConfigurationGroupVisitorPtr v = new groupsVisitor(mImplPriv);
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv);
+
+ boost::unique_lock<boost::shared_mutex> lock(mImplPriv->mLock);
for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
{
- (*group)->visit(v);
+ (*group)->visit(v);
}
}
void ConfigurationServiceImpl::removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
{
- class groupsVisitor : public RtpConfigurationGroupVisitor
+ class GroupVisitor : public RtpConfigurationGroupVisitor
{
public:
- groupsVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
-
- void removeItems(RtpConfigurationItemVisitor* visitor, ConfigurationItemDict& itemsToRemove,
- ConfigurationItemDict& localItems)
- {
- for (ConfigurationItemDict::const_iterator item = itemsToRemove.begin();
- item != itemsToRemove.end();
- ++item)
- {
- ConfigurationItemDict::iterator localItem = localItems.find((*item).first);
- if (localItem == localItems.end())
- {
- continue;
- }
- if (visitor != 0)
- {
- (*item).second->visit(visitor);
- }
- localItems.erase(localItem);
- }
- }
-
- void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
- {
- if (!mImplPriv->mGeneralGroup)
- {
- return;
- }
-
- removeItems(0, group->configurationItems, mImplPriv->mGeneralGroup->configurationItems);
- };
-
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+
+ void removeItems(RtpConfigurationItemVisitor* visitor, ConfigurationItemDict& itemsToRemove,
+ ConfigurationItemDict& localItems)
+ {
+ for (ConfigurationItemDict::const_iterator item = itemsToRemove.begin();
+ item != itemsToRemove.end();
+ ++item)
+ {
+ ConfigurationItemDict::iterator localItem = localItems.find(item->first);
+ if (localItem == localItems.end())
+ {
+ continue;
+ }
+ if (visitor != 0)
+ {
+ item->second->visit(visitor);
+ }
+ localItems.erase(localItem);
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
... 1343 lines suppressed ...
--
asterisk-scf/release/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list