[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "configuration-replication" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Mon May 23 09:42:30 CDT 2011
branch "configuration-replication" has been updated
via 1476d4d52c1cf76a287c34031310d3c71b30bf5b (commit)
via 468c255b5ece37590cdbdd1121541dfc305f2d95 (commit)
via 2fe8c6154989313a4f40eb45cccd5b946f8f495b (commit)
via 103929b269e3391fa36b4d34c81ca96efa5eaaef (commit)
via d77ecdc4c1ed57239a797b59be19f9841085cbe4 (commit)
via 10900df1422c8cdd3c43173863a6c7f8087f9801 (commit)
from 70dab79e95c3da49b6a1ea7ed8564530fd57a389 (commit)
Summary of changes:
...t_component.config => test_component_v6.config} | 10 +--
config/test_media_rtp_pjmedia.conf | 2 +-
local-slice/RtpStateReplicationIf.ice | 3 +-
src/MediaRTPpjmedia.cpp | 64 +++++++++++--
src/RTPSession.cpp | 18 +++-
src/RTPSession.h | 7 +-
src/RTPSink.cpp | 22 ++++-
src/RTPSource.cpp | 33 +++++--
src/RtpStateReplicator.h | 6 +-
src/RtpStateReplicatorApp.cpp | 15 ++-
src/RtpStateReplicatorListener.cpp | 3 +-
test/CMakeLists.txt | 10 ++
test/TestRTPpjmedia.cpp | 100 ++++++++++++++++++--
13 files changed, 240 insertions(+), 53 deletions(-)
copy config/{test_component.config => test_component_v6.config} (78%)
- Log -----------------------------------------------------------------
commit 1476d4d52c1cf76a287c34031310d3c71b30bf5b
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 23 11:43:54 2011 -0300
Update component to use new configuration replicator interface which is presented using a facet on the state replicator.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 385b5a1..a8b4ac8 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -70,7 +70,6 @@ module V1
void removeState(Ice::StringSeq items);
idempotent RtpStateItemSeq getState(Ice::StringSeq itemKeys);
idempotent RtpStateItemSeq getAllState();
- void registerConfigurationService(AsteriskSCF::System::Configuration::V1::ConfigurationService *service);
};
class RtpGeneralStateItem extends RtpStateItem
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index e106028..b05c352 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -534,7 +534,9 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
}
else if (mStateReplicator)
{
- mStateReplicator->registerConfigurationService(mConfigurationServiceProxy);
+ ConfigurationReplicatorPrx configurationReplicator = ConfigurationReplicatorPrx::checkedCast(
+ mStateReplicator.initialize(), ReplicatorFacet);
+ configurationReplicator->registerConfigurationService(mConfigurationServiceProxy);
}
if (mStateReplicator)
diff --git a/src/RtpStateReplicatorApp.cpp b/src/RtpStateReplicatorApp.cpp
index a65f52b..189e809 100644
--- a/src/RtpStateReplicatorApp.cpp
+++ b/src/RtpStateReplicatorApp.cpp
@@ -26,6 +26,7 @@
#include <AsteriskSCF/Logger/IceLogger.h>
#include <AsteriskSCF/logger.h>
#include <AsteriskSCF/CollocatedIceStorm/CollocatedIceStorm.h>
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
#include "RtpConfigurationIf.h"
#include "RtpStateReplicator.h"
@@ -38,6 +39,7 @@ using namespace AsteriskSCF::System::Logging;
using namespace AsteriskSCF::Media::RTP::V1;
using namespace AsteriskSCF::Media::RTP;
using namespace AsteriskSCF::CollocatedIceStorm;
+using namespace AsteriskSCF::System::Configuration::V1;
namespace
{
@@ -53,6 +55,7 @@ public:
mComponentService = 0;
mAdapter = 0;
mStateReplicator = 0;
+ mConfigurationReplicator = 0;
};
virtual void start(const string &name, const Ice::CommunicatorPtr& ic, const Ice::StringSeq& args);
virtual void stop();
@@ -68,6 +71,7 @@ private:
ConfiguredIceLoggerPtr mIceLogger;
ComponentServicePtr mComponentService;
RtpStateReplicatorIPtr mStateReplicator;
+ ConfigurationReplicatorPtr mConfigurationReplicator;
CollocatedIceStormPtr mIceStorm;
Ice::ObjectPrx mConfigurationPublisher;
Discovery::V1::ServiceManagementPrx mConfigurationManagement;
@@ -144,16 +148,16 @@ private:
typedef IceUtil::Handle<RtpConfigurationCompare> RtpConfigurationComparePtr;
-class RtpStateReplicatorConfigI : public RtpStateReplicatorI
+class ConfigurationReplicatorI : public ConfigurationReplicator
{
public:
- RtpStateReplicatorConfigI(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
+ ConfigurationReplicatorI(const IceStorm::TopicPrx& topic) : mConfigurationReplicationTopic(topic) { };
void registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx&, const Ice::Current&);
private:
IceStorm::TopicPrx mConfigurationReplicationTopic;
};
-void RtpStateReplicatorConfigI::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
+void ConfigurationReplicatorI::registerConfigurationService(const AsteriskSCF::System::Configuration::V1::ConfigurationServicePrx& service, const Ice::Current&)
{
if (mConfigurationReplicationTopic)
{
@@ -316,8 +320,11 @@ void RtpStateReplicatorService::initialize(const string& appName, const Ice::Com
// Create and publish our ComponentService interface support.
mComponentService = new ComponentServiceImpl(*this);
mAdapter->add(mComponentService, ic->stringToIdentity(ComponentServiceId));
- mStateReplicator = new RtpStateReplicatorConfigI(topic);
+ mStateReplicator = new RtpStateReplicatorI();
mAdapter->add(mStateReplicator, ic->stringToIdentity(ServiceDiscoveryId));
+ mConfigurationReplicator = new ConfigurationReplicatorI(topic);
+ mAdapter->addFacet(mConfigurationReplicator, ic->stringToIdentity(ServiceDiscoveryId),
+ ReplicatorFacet);
mAdapter->activate();
}
commit 468c255b5ece37590cdbdd1121541dfc305f2d95
Merge: 70dab79 2fe8c61
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 23 11:22:25 2011 -0300
Merge branch 'master' into configuration-replication
Conflicts:
config/RtpConfigurator.py
config/test_media_rtp_pjmedia.conf
local-slice/RtpConfigurationIf.ice
src/MediaRTPpjmedia.cpp
src/RTPSession.cpp
src/RTPSession.h
src/RtpStateReplicator.h
src/RtpStateReplicatorListener.cpp
diff --cc src/MediaRTPpjmedia.cpp
index 3c397e0,9989ddb..e106028
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@@ -525,6 -492,17 +557,17 @@@ void MediaRTPpjmediaApp::start(const st
}
}
+ ServiceLocatorParamsComparePtr rtpmediacomparatorservice = new RTPMediaServiceCompareServiceImpl();
+ ServiceLocatorParamsComparePrx RTPMediaComparatorServiceProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mGlobalAdapter->add(rtpmediacomparatorservice, mCommunicator->stringToIdentity(MediaComparatorServiceId)));
+
+ if (mReplicaService->isActive() == true)
+ {
+ mGeneralState->mComparatorId = IceUtil::generateUUID();
- management->addCompare(mGeneralState->mComparatorId, RTPMediaComparatorServiceProxy);
++ mManagement->addCompare(mGeneralState->mComparatorId, RTPMediaComparatorServiceProxy);
+ }
+
+
RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->add(rtpmediaservice,
mCommunicator->stringToIdentity(MediaServiceId)));
@@@ -533,12 -511,14 +576,14 @@@
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(
- management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
+ mManagement->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
/* Now we can add some parameters to help find us. */
- genericparams->category = "rtp";
- mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+ rtpparams->category = "rtp";
+ mGeneralState->mServiceManagement->addLocatorParams(rtpparams, mGeneralState->mComparatorId);
}
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+
/* One must provide a component service to manage us, if someone wants to */
ComponentServicePtr ComponentService = new ComponentServicepjmediaImpl(*this, mGeneralState);
ComponentServicePrx ComponentServiceProxy =
@@@ -569,14 -549,9 +614,17 @@@ void MediaRTPpjmediaApp::stop(
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement->unregister();
+ }
+ if (mConfigurationManagement)
+ {
+ mConfigurationManagement->unregister();
+ }
+ if (!mConfigCompareGuid.empty())
+ {
+ mManagement->removeCompare(mConfigCompareGuid);
+ ServiceLocatorManagementPrx management =
+ ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
+ management->removeCompare(mGeneralState->mComparatorId);
}
mCommunicator->destroy();
}
commit 2fe8c6154989313a4f40eb45cccd5b946f8f495b
Author: Joshua Colp <jcolp at digium.com>
Date: Thu May 19 16:39:40 2011 -0300
Fix issue when running tests on a system without IPv6.
diff --git a/config/test_component_v6.config b/config/test_component_v6.config
new file mode 100644
index 0000000..250ad0e
--- /dev/null
+++ b/config/test_component_v6.config
@@ -0,0 +1,64 @@
+# This is a configuration file used in conjunction with the media_rtp_pjmedia test driver
+
+#
+# Icebox Configuration
+#
+
+IceBox.InheritProperties=1
+IceBox.LoadOrder=ServiceDiscovery,RtpStateReplicator,MediaRTPpjmedia,MediaRTPpjmediaTest
+
+# RtpStateReplicator Configuration
+IceBox.Service.RtpStateReplicator=RtpStateReplicator:create
+
+# Adapter parameters for this component
+RtpStateReplicator.Endpoints=tcp:udp
+
+# A proxy to the service locator management service
+LocatorServiceManagement.Proxy=LocatorServiceManagement:tcp -p 4422
+
+# A proxy to the service locator service
+LocatorService.Proxy=LocatorService:tcp -p 4411
+
+#
+# media_rtp_pjmedia Configuration
+#
+
+IceBox.Service.MediaRTPpjmedia=media_rtp_pjmedia:create
+
+# Adapter parameters for this component
+MediaRTPpjmediaAdapter.Endpoints=default
+MediaRTPpjmediaAdapterLocal.Endpoints=default
+MediaRTPpjmediaAdapterLogger.Endpoints=default
+
+# A proxy to the service locator management service
+ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
+
+# A proxy to the service locator service
+ServiceLocatorProxy=LocatorService:tcp -p 4411
+
+#
+# media_rtp_pjmedia_test Configuration
+#
+
+IceBox.Service.MediaRTPpjmediaTest=media_rtp_pjmedia_test_v6:create
+
+#
+# Service Locator Configuration
+#
+
+IceBox.Service.ServiceDiscovery=service_locator:create
+
+AsteriskSCFIceStorm.InstanceName=AsteriskSCFIceStorm
+AsteriskSCFIceStorm.TopicManager.Endpoints=default -p 10000
+AsteriskSCFIceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001
+AsteriskSCFIceStorm.Trace.TopicManager=2
+AsteriskSCFIceStorm.Transient=1
+AsteriskSCFIceStorm.Flush.Timeout=2000
+TopicManager.Proxy=AsteriskSCFIceStorm/TopicManager:default -p 10000
+
+ServiceLocatorManagementAdapter.Endpoints=tcp -p 4422
+ServiceLocatorAdapter.Endpoints=tcp -p 4411
+ServiceLocatorLocalAdapter.Endpoints=tcp -p 4412
+LocatorService.Proxy=LocatorService:tcp -p 4411
+
+LoggerAdapter.Endpoints=default
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index f65d5b2..87662d4 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -9,16 +9,17 @@ asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test unit_test_fram
asterisk_scf_component_build_icebox(media_rtp_pjmedia_test)
target_link_libraries(media_rtp_pjmedia_test asterisk-scf-api)
-add_definitions(-DIPV6_TEST)
asterisk_scf_component_init(media_rtp_pjmedia_test_v6)
asterisk_scf_component_add_file(media_rtp_pjmedia_test_v6 TestRTPpjmedia.cpp)
asterisk_scf_component_add_slice(media_rtp_pjmedia_test_v6 ../local-slice/RtpStateReplicationIf.ice)
asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test_v6 unit_test_framework thread date_time)
asterisk_scf_component_build_icebox(media_rtp_pjmedia_test_v6)
+pjproject_link(media_rtp_pjmedia_test_v6 pjlib)
+set_property(TARGET media_rtp_pjmedia_test_v6 PROPERTY COMPILE_DEFINITIONS IPV6_TEST)
target_link_libraries(media_rtp_pjmedia_test_v6 asterisk-scf-api)
# integration test
if(integrated_build STREQUAL "true")
asterisk_scf_test_icebox(media_rtp_pjmedia_test config/test_component.config)
- asterisk_scf_test_icebox(media_rtp_pjmedia_test_v6 config/test_component.config)
+ asterisk_scf_test_icebox(media_rtp_pjmedia_test_v6 config/test_component_v6.config)
endif()
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index 810963e..3f5e06e 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -15,6 +15,7 @@
*/
#ifdef IPV6_TEST
#define BOOST_TEST_MODULE RTPpjmediaTestSuitev6
+#include <pj/config_site.h>
#else
#define BOOST_TEST_MODULE RTPpjmediaTestSuite
#endif
@@ -309,6 +310,8 @@ private:
BOOST_GLOBAL_FIXTURE(GlobalIceFixture);
+#if !defined(IPV6_TEST) || defined(PJ_HAS_IPV6)
+
/**
* Confirm that we find the rtp media session service based on category
*/
@@ -1111,6 +1114,17 @@ BOOST_AUTO_TEST_CASE(ReleaseRTPSession)
BOOST_CHECK(released);
}
+#else
+
+/**
+ * Dummy test in case IPv6 is not supported
+ */
+BOOST_AUTO_TEST_CASE(DummyTest)
+{
+}
+
+#endif
+
void MediaRTPpjmediaTest::start(std::string const& name,
Ice::CommunicatorPtr const&,
Ice::StringSeq const& args)
commit 103929b269e3391fa36b4d34c81ca96efa5eaaef
Author: Joshua Colp <jcolp at digium.com>
Date: Thu May 19 15:40:47 2011 -0300
Fix some incorrect assertions. I always screw these up.
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index ae9423c..7bbf164 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -144,7 +144,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPSe
/* Create an endpoint in pjmedia for our media. */
pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- assert(status != PJ_SUCCESS);
+ assert(status == PJ_SUCCESS);
int af = (params->ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
int minimumPort = configurationService->getStartPort();
@@ -193,7 +193,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_fac
pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- assert(status != PJ_SUCCESS);
+ assert(status == PJ_SUCCESS);
int af = (ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
commit d77ecdc4c1ed57239a797b59be19f9841085cbe4
Author: Joshua Colp <jcolp at digium.com>
Date: Mon May 2 14:47:29 2011 -0300
Add support for configuration.
diff --git a/config/Rtp.config b/config/Rtp.config
new file mode 100644
index 0000000..d8da13e
--- /dev/null
+++ b/config/Rtp.config
@@ -0,0 +1,12 @@
+# General Configuation Options
+
+[general]
+
+# Start port for RTP and RTCP ports
+startport=10000
+
+# End port for RTP and RTCP ports
+endport=20000
+
+# Number of worker threads for incoming media
+workerthreadcount=4
diff --git a/config/RtpConfigurator.py b/config/RtpConfigurator.py
new file mode 100755
index 0000000..87b3f65
--- /dev/null
+++ b/config/RtpConfigurator.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+#
+# Asterisk SCF -- An open-source communications framework.
+#
+# Copyright (C) 2011, Digium, Inc.
+#
+# See http://www.asterisk.org for more information about
+# the Asterisk SCF project. Please do not directly contact
+# any of the maintainers of this project for assistance;
+# the project provides a web site, mailing lists and IRC
+# channels for your use.
+#
+# This program is free software, distributed under the terms of
+# the GNU General Public License Version 2. See the LICENSE.txt file
+# at the top of the source tree.
+#
+
+# Rtp configurator
+
+# Bring in the common configuration infrastructure
+import Ice, Configurator, sys
+
+# Load our component specific configuration definitions
+Ice.loadSlice('-I. -I/opt/Ice-3.4.1/slice -I../../slice --all ../local-slice/RtpConfigurationIf.ice')
+import AsteriskSCF.Media.RTP.V1
+
+# Add our own visitor implementations for the sections we support
+class RtpSectionVisitors(Configurator.SectionVisitors):
+ def visit_general(self, config, section):
+ group = AsteriskSCF.Media.RTP.V1.RtpGeneralGroup()
+ group.configurationItems = { }
+
+ mapper = Configurator.OptionMapper()
+
+ portsItem = AsteriskSCF.Media.RTP.V1.PortRangesItem()
+ mapper.map('startport', portsItem, 'startPort', AsteriskSCF.Media.RTP.V1.PortRangesItemName, config.getint, 10000)
+ mapper.map('endport', portsItem, 'endPort', AsteriskSCF.Media.RTP.V1.PortRangesItemName, config.getint, 20000)
+
+ workerItem = AsteriskSCF.Media.RTP.V1.WorkerThreadCountItem()
+ mapper.map('workerthreadcount', workerItem, 'count', AsteriskSCF.Media.RTP.V1.WorkerThreadCountItemName, config.getint, 4)
+
+ for option in config.options(section):
+ mapper.execute(group, section, option)
+
+ mapper.finish(group)
+
+ self.groups.append(group)
+
+# Make a configurator application and let it run
+app = Configurator.ConfiguratorApp('Rtp.config', RtpSectionVisitors())
+sys.exit(app.main(sys.argv))
diff --git a/config/test_media_rtp_pjmedia.conf b/config/test_media_rtp_pjmedia.conf
index 3a451da..480a7ae 100644
--- a/config/test_media_rtp_pjmedia.conf
+++ b/config/test_media_rtp_pjmedia.conf
@@ -4,8 +4,8 @@ IceBox.Service.MediaRTPpjmedia=media_rtp_pjmedia:create
# Adapter parameters for this component
MediaRTPpjmediaAdapter.Endpoints=tcp -p 4423
-MediaRTPpjmediaAdapterLogger.Endpoints=default
-MediaRTPpjmediaAdapterLocal.Endpoints=default
+MediaRTPpjmediaAdapterLogger.Endpoints=default -h 127.0.0.1
+MediaRTPpjmediaAdapterLocal.Endpoints=tcp -p 4424
# A proxy to the service locator management service
ServiceLocatorManagementProxy=LocatorServiceManagement:tcp -p 4422
diff --git a/local-slice/RtpConfigurationIf.ice b/local-slice/RtpConfigurationIf.ice
new file mode 100644
index 0000000..45bbd59
--- /dev/null
+++ b/local-slice/RtpConfigurationIf.ice
@@ -0,0 +1,128 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+#include <Ice/BuiltinSequences.ice>
+#include <AsteriskSCF/System/Component/ConfigurationIf.ice>
+
+module AsteriskSCF
+{
+
+module Media
+{
+
+module RTP
+{
+
+["suppress"]
+module V1
+{
+ /**
+ * Local visitor class for visiting RTP configuration groups
+ */
+ local class RtpConfigurationGroupVisitor extends AsteriskSCF::System::Configuration::V1::ConfigurationGroupVisitor
+ {
+ };
+
+ /**
+ * Generic RTP configuration group
+ */
+ ["visitor:RtpConfigurationGroupVisitor"] class RtpConfigurationGroup extends AsteriskSCF::System::Configuration::V1::ConfigurationGroup
+ {
+ };
+
+ /**
+ * General RTP configuration group that contains general items related to the RTP component as a whole
+ */
+ class RtpGeneralGroup extends RtpConfigurationGroup
+ {
+ };
+
+ /**
+ * Local visitor class for visiting RTP configuration items
+ */
+ local class RtpConfigurationItemVisitor extends AsteriskSCF::System::Configuration::V1::ConfigurationItemVisitor
+ {
+ };
+
+ /**
+ * Generic RTP configuration item
+ */
+ ["visitor:RtpConfigurationItemVisitor"] class RtpConfigurationItem extends AsteriskSCF::System::Configuration::V1::ConfigurationItem
+ {
+ };
+
+ /**
+ * Name that the port ranges configuration item should be inserted as
+ */
+ const string PortRangesItemName = "ports";
+
+ /**
+ * Port ranges configuration item
+ *
+ * This must be added to the general configuration group using the constant string
+ * in PortRangesItemName
+ *
+ */
+ class PortRangesItem extends RtpConfigurationItem
+ {
+ /**
+ * Start port for RTP and RTCP sockets.
+ *
+ * Default value is 10000.
+ *
+ */
+ int startPort = 10000;
+
+ /**
+ * End port for RTP and RTCP sockets.
+ *
+ * Default value is 20000.
+ *
+ */
+ int endPort = 20000;
+ };
+
+ /**
+ * Name that the worker thread count configuration item should be inserted as
+ */
+ const string WorkerThreadCountItemName = "workerThreadCount";
+
+ /**
+ * Worker thread count for incoming media configuration item
+ *
+ * This must be added to the general configuration group using the constant string
+ * in WorkerThreadCountItemName
+ *
+ */
+ class WorkerThreadCountItem extends RtpConfigurationItem
+ {
+ /**
+ * Number of threads that should handle incoming media.
+ *
+ * Default value is 4.
+ */
+ int count = 4;
+ };
+
+}; /* module V1 */
+
+}; /* module RTP */
+
+}; /* module Media */
+
+}; /* module Asterisk SCF */
+
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b3396d3..c3be82a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -17,7 +17,10 @@ asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.h)
asterisk_scf_component_add_file(media_rtp_pjmedia RTPSink.h)
asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicatorListener.cpp)
asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicator.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia RTPConfiguration.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia RTPConfiguration.h)
asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpStateReplicationIf.ice)
+asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpConfigurationIf.ice)
asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia core thread)
asterisk_scf_component_build_icebox(media_rtp_pjmedia)
target_link_libraries(media_rtp_pjmedia logging-client)
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 3fb0964..9989ddb 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -36,6 +36,7 @@
#include "RTPSession.h"
#include "RtpStateReplicator.h"
+#include "RTPConfiguration.h"
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
@@ -53,6 +54,7 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
static const string ReplicaServiceId("MediaRtpReplica");
static const string MediaServiceId("RTPMediaService");
static const string MediaComparatorServiceId("RTPMediaServiceComparator");
+static const string ConfigurationServiceId("RTPConfigurationService");
/**
* Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
@@ -61,7 +63,8 @@ class RTPMediaServiceImpl : public RTPMediaService
{
public:
RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&);
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr&);
RTPSessionPrx allocate(const RTPServiceLocatorParamsPtr&, const Ice::Current&);
pj_pool_factory *getPoolFactory() { return &mCachingPool.factory; };
private:
@@ -86,6 +89,11 @@ private:
ReplicaPtr mReplicaService;
/**
+ * A pointer to the configuration service.
+ */
+ ConfigurationServiceImplPtr mConfigurationService;
+
+ /**
* A proxy to the state replicator.
*/
AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
@@ -234,6 +242,11 @@ private:
ReplicaPtr mReplicaService;
/**
+ * Instance of our configuration service implementation.
+ */
+ ConfigurationServiceImplPtr mConfigurationService;
+
+ /**
* Instance of our state replicator listener.
*/
RtpStateReplicatorListenerPtr mReplicatorListener;
@@ -360,8 +373,10 @@ private:
* Constructor for the RTPMediaServiceImpl class.
*/
RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
- mAdapter(adapter), mReplicaService(replicaService), mStateReplicator(stateReplicator)
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+ const ConfigurationServiceImplPtr& configurationService) :
+ mAdapter(adapter), mReplicaService(replicaService), mConfigurationService(configurationService),
+ mStateReplicator(stateReplicator)
{
/* Initialize the memory caching pool using default policy as specified by pjlib. */
pj_caching_pool_init(&mCachingPool, &pj_pool_factory_default_policy, 0);
@@ -376,7 +391,8 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, c
RTPSessionPrx RTPMediaServiceImpl::allocate(const RTPServiceLocatorParamsPtr& params, const Ice::Current&)
{
RTPSessionImplPtr session =
- new RTPSessionImpl(mAdapter, params, &mCachingPool.factory, mReplicaService, mStateReplicator);
+ new RTPSessionImpl(mAdapter, params, &mCachingPool.factory, mReplicaService, mStateReplicator,
+ mConfigurationService);
return session->getProxy();
}
@@ -420,6 +436,9 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
mReplicaService = new ReplicaImpl(mLocalAdapter);
mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
+ mConfigurationService = new ConfigurationServiceImpl();
+ mLocalAdapter->add(mConfigurationService, mCommunicator->stringToIdentity(ConfigurationServiceId));
+
mLocalAdapter->activate();
mGlobalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapter");
@@ -451,12 +470,13 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
}
RTPMediaServiceImplPtr rtpmediaservice =
- new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator);
+ new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator, mConfigurationService);
if (mStateReplicator)
{
mReplicatorListener =
- new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState);
+ new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState,
+ mConfigurationService);
mReplicatorListenerProxy =
RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
new file mode 100644
index 0000000..1fef9f5
--- /dev/null
+++ b/src/RTPConfiguration.cpp
@@ -0,0 +1,402 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <IceUtil/UUID.h>
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+
+#include "RtpConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Media::RTP::V1;
+
+class ConfigurationServiceImplPriv
+{
+public:
+ /**
+ * General RTP configuration
+ */
+ RtpGeneralGroupPtr mGeneralGroup;
+
+ /**
+ * Shared mutex lock which protects the configuration
+ */
+ boost::shared_mutex mLock;
+};
+
+ConfigurationServiceImpl::ConfigurationServiceImpl() : mImplPriv(new ConfigurationServiceImplPriv())
+{
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+ class GroupVisitor : public RtpConfigurationGroupVisitor
+ {
+ public:
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) : mImplPriv(implPriv), mGroups(visitorGroups) { };
+
+ private:
+ /**
+ * Internal helper function which determines what configuration items should be returned
+ */
+ void insertRequestedConfigurationItems(const ConfigurationItemDict& requestedItems,
+ const ConfigurationItemDict& localItems,
+ ConfigurationItemDict& returnedItems)
+ {
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationItemDict::const_iterator requestedItem = requestedItems.cbegin();
+ requestedItem != requestedItems.end();
+ ++requestedItem)
+ {
+ ConfigurationItemDict::const_iterator localItem = localItems.find(requestedItem->first);
+
+ if (localItem != localItems.end())
+ {
+ returnedItems.insert((*requestedItem));
+ }
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ RtpGeneralGroupPtr returnedGroup = new RtpGeneralGroup();
+
+ insertRequestedConfigurationItems(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, returnedGroup->configurationItems);
+
+ mGroups.push_back(returnedGroup);
+ };
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ ConfigurationGroupSeq& mGroups;
+ };
+
+ ConfigurationGroupSeq newGroups;
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv, newGroups);
+
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+
+ return newGroups;
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+ class GroupVisitor : public RtpConfigurationGroupVisitor
+ {
+ public:
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) :
+ mImplPriv(implPriv), mGroups(visitorGroups) { };
+
+ private:
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ mGroups.push_back(mImplPriv->mGeneralGroup);
+ };
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ ConfigurationGroupSeq& mGroups;
+ };
+
+ ConfigurationGroupSeq newGroups;
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv, newGroups);
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+
+ return newGroups;
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationGroups(const Ice::Current&)
+{
+ ConfigurationGroupSeq groups;
+
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ if (mImplPriv->mGeneralGroup)
+ {
+ RtpGeneralGroupPtr general = new RtpGeneralGroup();
+ groups.push_back(general);
+ }
+
+ return groups;
+}
+
+void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+ class GroupVisitor : public RtpConfigurationGroupVisitor
+ {
+ public:
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+
+ private:
+ /**
+ * Helper function which performs serial number checking of items
+ */
+ void performSerialCheck(const ConfigurationItemDict& changedItems, const ConfigurationItemDict& localItems,
+ const AsteriskSCF::System::Configuration::V1::ConfigurationGroupPtr& group)
+ {
+ for (ConfigurationItemDict::const_iterator item = changedItems.cbegin();
+ item != changedItems.end();
+ ++item)
+ {
+ // If serial checking is to be skipped for this item just skip over it
+ if (item->second->serialNumber == -1)
+ {
+ continue;
+ }
+
+ ConfigurationItemDict::const_iterator localItem = localItems.find(item->first);
+
+ if (localItem == localItems.end())
+ {
+ // This is a new item so serial checking does not apply
+ continue;
+ }
+
+ if (item->second->serialNumber < localItem->second->serialNumber)
+ {
+ throw SerialConflict(group, item->second);
+ }
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ mImplPriv->mGeneralGroup = new RtpGeneralGroup();
+ }
+ else
+ {
+ performSerialCheck(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, group);
+ }
+
+ for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
+ item != group->configurationItems.end();
+ ++item)
+ {
+ mImplPriv->mGeneralGroup->configurationItems.erase(item->first);
+ mImplPriv->mGeneralGroup->configurationItems.insert((*item));
+ }
+ }
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ };
+
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv);
+
+ boost::unique_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+}
+
+void ConfigurationServiceImpl::removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+ class GroupVisitor : public RtpConfigurationGroupVisitor
+ {
+ public:
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+
+ void removeItems(RtpConfigurationItemVisitor* visitor, ConfigurationItemDict& itemsToRemove,
+ ConfigurationItemDict& localItems)
+ {
+ for (ConfigurationItemDict::const_iterator item = itemsToRemove.begin();
+ item != itemsToRemove.end();
+ ++item)
+ {
+ ConfigurationItemDict::iterator localItem = localItems.find(item->first);
+ if (localItem == localItems.end())
+ {
+ continue;
+ }
+ if (visitor != 0)
+ {
+ item->second->visit(visitor);
+ }
+ localItems.erase(localItem);
+ }
+ }
+
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ removeItems(0, group->configurationItems, mImplPriv->mGeneralGroup->configurationItems);
+ };
+
+ private:
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ };
+
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv);
+
+ boost::unique_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+}
+
+void ConfigurationServiceImpl::removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+ class GroupVisitor : public RtpConfigurationGroupVisitor
+ {
+ public:
+ GroupVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+
+ private:
+ void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
+ {
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return;
+ }
+
+ mImplPriv->mGeneralGroup = 0;
+ };
+
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+ };
+
+ RtpConfigurationGroupVisitorPtr v = new GroupVisitor(mImplPriv);
+
+ boost::unique_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+
+ for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+ {
+ (*group)->visit(v);
+ }
+}
+
+/**
+ * Internal function which returns configured start port.
+ */
+int ConfigurationServiceImpl::getStartPort()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+ PortRangesItemPtr portsDefault = new PortRangesItem();
+
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return portsDefault->startPort;
+ }
+
+ ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find(PortRangesItemName);
+
+ if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+ {
+ return portsDefault->startPort;
+ }
+
+ PortRangesItemPtr ports = PortRangesItemPtr::dynamicCast(item->second);
+
+ if (!ports || (ports->endPort <= ports->startPort) || (ports->startPort % 2))
+ {
+ return portsDefault->startPort;
+ }
+
+ return ports->startPort;
+}
+
+/**
+ * Internal function which returns configured end port.
+ */
+int ConfigurationServiceImpl::getEndPort()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+ PortRangesItemPtr portsDefault = new PortRangesItem();
+
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return portsDefault->endPort;
+ }
+
+ ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find(PortRangesItemName);
+
+ if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+ {
+ return portsDefault->endPort;
+ }
+
+ PortRangesItemPtr ports = PortRangesItemPtr::dynamicCast(item->second);
+
+ if (!ports || (ports->endPort <= ports->startPort) || (ports->endPort % 2))
+ {
+ return portsDefault->endPort;
+ }
+
+ return ports->endPort;
+}
+
+/**
+ * Internal function which returns configured worker thread count.
+ */
+int ConfigurationServiceImpl::getWorkerThreadCount()
+{
+ boost::shared_lock<boost::shared_mutex> lock(mImplPriv->mLock);
+ WorkerThreadCountItemPtr workerThreadCountDefault = new WorkerThreadCountItem();
+
+ if (!mImplPriv->mGeneralGroup)
+ {
+ return workerThreadCountDefault->count;
+ }
+
+ ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find(WorkerThreadCountItemName);
+
+ if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+ {
+ return workerThreadCountDefault->count;
+ }
+
+ WorkerThreadCountItemPtr workerThreadCount = WorkerThreadCountItemPtr::dynamicCast(item->second);
+
+ if (!workerThreadCount)
+ {
+ return workerThreadCountDefault->count;
+ }
+
+ return workerThreadCount->count;
+}
diff --git a/src/RTPConfiguration.h b/src/RTPConfiguration.h
new file mode 100644
index 0000000..96d2180
--- /dev/null
+++ b/src/RTPConfiguration.h
@@ -0,0 +1,56 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <Ice/Ice.h>
+
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+
+/*
+ * Private implementation class for ConfigurationServiceImpl.
+ */
+class ConfigurationServiceImplPriv;
+
+/**
+ * Implementation of the configuration service.
+ */
+class ConfigurationServiceImpl : public AsteriskSCF::System::Configuration::V1::ConfigurationService
+{
+public:
+ ConfigurationServiceImpl();
+
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfigurationGroups(const Ice::Current&);
+ void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+ void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+
+ int getStartPort();
+ int getEndPort();
+ int getWorkerThreadCount();
+private:
+ /**
+ * Private implementation details.
+ */
+ boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+};
+
+/**
+ * A typedef which creates a smart pointer type for ConfigurationServiceImpl.
+ */
+typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index bf97327..ae9423c 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -30,6 +30,8 @@
#include "RTPSource.h"
#include "RTPSink.h"
+#include "RTPConfiguration.h"
+
using namespace std;
using namespace AsteriskSCF::Core::Discovery::V1;
using namespace AsteriskSCF::Media::V1;
@@ -54,8 +56,8 @@ class RTPSessionImplPriv
{
public:
RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
- const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
+ const ReplicaPtr& replicaService,
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
mAdapter(adapter), mFormats(formats),
mSessionStateItem(new RtpSessionStateItem()),
mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
@@ -131,25 +133,25 @@ public:
* Constructor for the RTPSessionImpl class (used by Ice).
*/
RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPServiceLocatorParamsPtr& params,
- pj_pool_factory* factory, const ReplicaPtr& replicaService,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
+ pj_pool_factory* factory, const ReplicaPtr& replicaService,
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+ const ConfigurationServiceImplPtr& configurationService) :
mImpl(new RTPSessionImplPriv(adapter, params->formats, replicaService, stateReplicator))
{
/* Add ourselves to the ICE ASM so we can be used. */
mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->addWithUUID(this));
/* Create an endpoint in pjmedia for our media. */
- pj_status_t status = pjmedia_endpt_create(factory, NULL, 1, &mImpl->mEndpoint);
+ pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- if (status != PJ_SUCCESS)
- {
- /* TODO: This is bad... we can't go on! */
- }
+ assert(status != PJ_SUCCESS);
int af = (params->ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
+ int minimumPort = configurationService->getStartPort();
+ int maximumPort = configurationService->getEndPort();
/* Now create some transport we can use to actually send or receive the media. */
- for (int port = DEFAULT_RTP_PORT_MINIMUM; port < DEFAULT_RTP_PORT_MAXIMUM; port += 2)
+ for (int port = minimumPort; port < maximumPort; port += 2)
{
if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", NULL, port, 0, &mImpl->mTransport)) ==
PJ_SUCCESS)
@@ -183,25 +185,22 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPSe
* Constructor for the RTPSessionImpl class (used by state replicator).
*/
RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory* factory,
- const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
- Ice::Int port, const FormatSeq& formats, bool ipv6) :
+ const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
+ Ice::Int port, const FormatSeq& formats, bool ipv6, const ConfigurationServiceImplPtr& configurationService) :
mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>)))
{
mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
- pj_status_t status = pjmedia_endpt_create(factory, NULL, 1, &mImpl->mEndpoint);
+ pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
- if (status != PJ_SUCCESS)
- {
- /* TODO: This is bad... we can't go on! */
- }
+ assert(status != PJ_SUCCESS);
int af = (ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", NULL, port, 0, &mImpl->mTransport))
!= PJ_SUCCESS)
{
- // TODO: This is also bad, something is using the port */
+ // TODO: This is also bad, something is using the port
}
mImpl->mStreamSource = new StreamSourceRTPImpl(this, "");
@@ -442,6 +441,7 @@ void RTPSessionImpl::replicateState(const RtpSessionStateItemPtr& session, const
}
catch (...)
{
+ mImpl->mStateReplicator->setState(items);
}
}
@@ -486,5 +486,6 @@ void RTPSessionImpl::removeState(const RtpSessionStateItemPtr& session, const Rt
}
catch (...)
{
+ mImpl->mStateReplicator->removeState(items);
}
}
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 6638b14..4ba6e86 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -11,6 +11,8 @@
#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/Discovery/SmartProxy.h>
+#include "RTPConfiguration.h"
+
/**
* Forward definition for our private implementation of RTPSession.
*/
@@ -44,9 +46,11 @@ class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
public:
RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr&,
pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&,
- const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&);
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
+ const ConfigurationServiceImplPtr&);
RTPSessionImpl(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const Ice::Identity&, const Ice::Identity&,
- const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&, bool);
+ const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&, bool,
+ const ConfigurationServiceImplPtr&);
AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index 4ab9e64..13c9502 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -20,8 +20,12 @@
#include <AsteriskSCF/Replication/StateReplicator.h>
#include "RtpStateReplicationIf.h"
+
#include <boost/shared_ptr.hpp>
+#include "RTPConfiguration.h"
+
+
typedef AsteriskSCF::Replication::StateReplicator<
AsteriskSCF::Media::RTP::V1::RtpStateReplicator,
AsteriskSCF::Media::RTP::V1::RtpStateItemPtr,
@@ -38,7 +42,9 @@ class RtpStateReplicatorListenerI : public AsteriskSCF::Media::RTP::V1::RtpState
{
public:
RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*,
- const AsteriskSCF::Media::RTP::V1::RtpGeneralStateItemPtr&);
+ const AsteriskSCF::Media::RTP::V1::RtpGeneralStateItemPtr&,
+ const ConfigurationServiceImplPtr&);
+ ~RtpStateReplicatorListenerI();
void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
void stateSet(const AsteriskSCF::Media::RTP::V1::RtpStateItemSeq&, const Ice::Current&);
bool operator==(const RtpStateReplicatorListenerI &rhs);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index 2bfab90..4f3cef1 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -55,8 +55,10 @@ struct RtpStateReplicatorListenerImpl
{
public:
RtpStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory *poolFactory,
- const RtpGeneralStateItemPtr& generalState)
- : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState) {}
+ const RtpGeneralStateItemPtr& generalState,
+ const ConfigurationServiceImplPtr& configurationService)
+ : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState),
+ mConfigurationService(configurationService) {}
void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
{
@@ -97,7 +99,8 @@ public:
RTPSessionImplPtr localSession =
new RTPSessionImpl(mImpl->mAdapter, mImpl->mPoolFactory, item->mSessionIdentity,
- item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats, item->mIPv6);
+ item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats, item->mIPv6,
+ mImpl->mConfigurationService);
localitem->setSession(localSession);
}
else
@@ -144,11 +147,15 @@ public:
Ice::ObjectAdapterPtr mAdapter;
pj_pool_factory *mPoolFactory;
RtpGeneralStateItemPtr mGeneralState;
+ ConfigurationServiceImplPtr mConfigurationService;
};
RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr& adapter,
- pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState)
- : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState))
+ pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState,
+ const ConfigurationServiceImplPtr& configurationService)
+ : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState, configurationService)) {}
+
+RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
{
}
commit 10900df1422c8cdd3c43173863a6c7f8087f9801
Author: Joshua Colp <jcolp at digium.com>
Date: Sun May 8 11:56:18 2011 -0300
Add support for IPv6.
diff --git a/local-slice/RtpStateReplicationIf.ice b/local-slice/RtpStateReplicationIf.ice
index 254b50e..90a534a 100644
--- a/local-slice/RtpStateReplicationIf.ice
+++ b/local-slice/RtpStateReplicationIf.ice
@@ -74,6 +74,7 @@ module V1
class RtpGeneralStateItem extends RtpStateItem
{
AsteriskSCF::Core::Discovery::V1::ServiceManagement *mServiceManagement;
+ string mComparatorId;
};
class RtpSessionStateItem extends RtpStateItem
@@ -84,6 +85,7 @@ module V1
Ice::Identity mSourceIdentity;
AsteriskSCF::Media::V1::FormatSeq mFormats;
PayloadMap mPayloadstoFormats;
+ bool mIPv6;
};
class RtpStreamSinkStateItem extends RtpStateItem
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index ab63f28..3fb0964 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -52,6 +52,7 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
static const string ReplicaServiceId("MediaRtpReplica");
static const string MediaServiceId("RTPMediaService");
+static const string MediaComparatorServiceId("RTPMediaServiceComparator");
/**
* Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
@@ -60,8 +61,8 @@ class RTPMediaServiceImpl : public RTPMediaService
{
public:
RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
- const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&);
- RTPSessionPrx allocate(const FormatSeq&, const Ice::Current&);
+ const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>&);
+ RTPSessionPrx allocate(const RTPServiceLocatorParamsPtr&, const Ice::Current&);
pj_pool_factory *getPoolFactory() { return &mCachingPool.factory; };
private:
/**
@@ -161,6 +162,37 @@ private:
};
/**
+ * Implementation of the ServiceLocatorParamsCompare class
+ */
+class RTPMediaServiceCompareServiceImpl : public ServiceLocatorParamsCompare
+{
+public:
+ bool isSupported(const ServiceLocatorParamsPtr& locatorParams, const Ice::Current&)
+ {
+ RTPServiceLocatorParamsPtr params;
+
+ if (!(params = RTPServiceLocatorParamsPtr::dynamicCast(locatorParams)))
+ {
+ return false;
+ }
+
+ bool result = true;
+
+ // This is done on purpose for additional checks in the future
+ if (params->ipv6 == true)
+ {
+#ifdef PJ_HAS_IPV6
+ result = true;
+#else
+ result = false;
+#endif
+ }
+
+ return result;
+ };
+};
+
+/**
* Implementation of the IceBox::Service class
*/
class MediaRTPpjmediaApp : public IceBox::Service
@@ -341,10 +373,10 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, c
/**
* Implementation of the allocate method as defined in MediaRTPIf.ice
*/
-RTPSessionPrx RTPMediaServiceImpl::allocate(const FormatSeq& formats, const Ice::Current&)
+RTPSessionPrx RTPMediaServiceImpl::allocate(const RTPServiceLocatorParamsPtr& params, const Ice::Current&)
{
RTPSessionImplPtr session =
- new RTPSessionImpl(mAdapter, formats, &mCachingPool.factory, mReplicaService, mStateReplicator);
+ new RTPSessionImpl(mAdapter, params, &mCachingPool.factory, mReplicaService, mStateReplicator);
return session->getProxy();
}
@@ -440,20 +472,33 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
}
}
+ ServiceLocatorParamsComparePtr rtpmediacomparatorservice = new RTPMediaServiceCompareServiceImpl();
+ ServiceLocatorParamsComparePrx RTPMediaComparatorServiceProxy = ServiceLocatorParamsComparePrx::uncheckedCast(
+ mGlobalAdapter->add(rtpmediacomparatorservice, mCommunicator->stringToIdentity(MediaComparatorServiceId)));
+
+ if (mReplicaService->isActive() == true)
+ {
+ mGeneralState->mComparatorId = IceUtil::generateUUID();
+ management->addCompare(mGeneralState->mComparatorId, RTPMediaComparatorServiceProxy);
+ }
+
+
RTPMediaServicePrx RTPMediaServiceProxy = RTPMediaServicePrx::uncheckedCast(mGlobalAdapter->add(rtpmediaservice,
mCommunicator->stringToIdentity(MediaServiceId)));
- ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+ RTPServiceLocatorParamsPtr rtpparams = new RTPServiceLocatorParams();
if (mReplicaService->isActive() == true)
{
mGeneralState->mServiceManagement = ServiceManagementPrx::uncheckedCast(
management->addService(RTPMediaServiceProxy, "media_rtp_pjmedia"));
/* Now we can add some parameters to help find us. */
- genericparams->category = "rtp";
- mGeneralState->mServiceManagement->addLocatorParams(genericparams, "");
+ rtpparams->category = "rtp";
+ mGeneralState->mServiceManagement->addLocatorParams(rtpparams, mGeneralState->mComparatorId);
}
+ ServiceLocatorParamsPtr genericparams = new ServiceLocatorParams();
+
/* One must provide a component service to manage us, if someone wants to */
ComponentServicePtr ComponentService = new ComponentServicepjmediaImpl(*this, mGeneralState);
ComponentServicePrx ComponentServiceProxy =
@@ -483,7 +528,10 @@ void MediaRTPpjmediaApp::stop()
mComponentServiceManagement->unregister();
if (mReplicaService->isActive() == true)
{
- mGeneralState-> mServiceManagement->unregister();
+ mGeneralState->mServiceManagement->unregister();
+ ServiceLocatorManagementPrx management =
+ ServiceLocatorManagementPrx::checkedCast(mCommunicator->propertyToProxy("ServiceLocatorManagementProxy"));
+ management->removeCompare(mGeneralState->mComparatorId);
}
mCommunicator->destroy();
}
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 2cddb39..bf97327 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -130,10 +130,10 @@ public:
/**
* Constructor for the RTPSessionImpl class (used by Ice).
*/
-RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
+RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const RTPServiceLocatorParamsPtr& params,
pj_pool_factory* factory, const ReplicaPtr& replicaService,
const AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
- mImpl(new RTPSessionImplPriv(adapter, formats, replicaService, stateReplicator))
+ mImpl(new RTPSessionImplPriv(adapter, params->formats, replicaService, stateReplicator))
{
/* Add ourselves to the ICE ASM so we can be used. */
mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->addWithUUID(this));
@@ -146,10 +146,12 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const Forma
/* TODO: This is bad... we can't go on! */
}
+ int af = (params->ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
+
/* Now create some transport we can use to actually send or receive the media. */
for (int port = DEFAULT_RTP_PORT_MINIMUM; port < DEFAULT_RTP_PORT_MAXIMUM; port += 2)
{
- if ((status = pjmedia_transport_udp_create2(mImpl->mEndpoint, "RTP", NULL, port, 0, &mImpl->mTransport)) ==
+ if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", NULL, port, 0, &mImpl->mTransport)) ==
PJ_SUCCESS)
{
mImpl->mSessionStateItem->mPort = port;
@@ -160,7 +162,8 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const Forma
// Initialize our session state item enough so that the state items for the source and sink can also be initialized.
mImpl->mSessionStateItem->key = mImpl->mSessionStateItem->mSessionId = IceUtil::generateUUID();
mImpl->mSessionStateItem->mSessionIdentity = mImpl->mProxy->ice_getIdentity();
- mImpl->mSessionStateItem->mFormats = formats;
+ mImpl->mSessionStateItem->mFormats = params->formats;
+ mImpl->mSessionStateItem->mIPv6 = params->ipv6;
/* First up for our own stuff is... a source! Media needs to come from somewhere, you know. */
mImpl->mStreamSource = new StreamSourceRTPImpl(this, mImpl->mSessionStateItem->key);
@@ -181,7 +184,7 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const Forma
*/
RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory* factory,
const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
- Ice::Int port, const FormatSeq& formats) :
+ Ice::Int port, const FormatSeq& formats, bool ipv6) :
mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::Discovery::SmartProxy<RtpStateReplicatorPrx>)))
{
mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
@@ -193,7 +196,9 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_fac
/* TODO: This is bad... we can't go on! */
}
- if ((status = pjmedia_transport_udp_create2(mImpl->mEndpoint, "RTP", NULL, port, 0, &mImpl->mTransport))
+ int af = (ipv6 == true) ? pj_AF_INET6() : pj_AF_INET();
+
+ if ((status = pjmedia_transport_udp_create3(mImpl->mEndpoint, af, "RTP", NULL, port, 0, &mImpl->mTransport))
!= PJ_SUCCESS)
{
// TODO: This is also bad, something is using the port */
diff --git a/src/RTPSession.h b/src/RTPSession.h
index 1703cd0..6638b14 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -42,11 +42,11 @@ typedef IceUtil::Handle<StreamSourceRTPImpl> StreamSourceRTPImplPtr;
class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
{
public:
- RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::V1::FormatSeq&,
- pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&,
- const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&);
+ RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::RTP::V1::RTPServiceLocatorParamsPtr&,
+ pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&,
+ const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&);
RTPSessionImpl(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const Ice::Identity&, const Ice::Identity&,
- const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&);
+ const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&, bool);
AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
std::string getId(const Ice::Current&);
diff --git a/src/RTPSink.cpp b/src/RTPSink.cpp
index 5c591b3..41e7b18 100644
--- a/src/RTPSink.cpp
+++ b/src/RTPSink.cpp
@@ -208,8 +208,17 @@ std::string StreamSinkRTPImpl::getRemoteAddress(const Ice::Current&)
pjmedia_transport_info_init(&transportInfo);
pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
- string address = pj_inet_ntoa(transportInfo.src_rtp_name.ipv4.sin_addr);
- return (address != "0.0.0.0") ? address : mImpl->mSinkStateItem->mRemoteAddress;
+ if (transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET &&
+ transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET6)
+ {
+ // If we have no remote address yet (we know because the default initialization
+ // for the above is neither PJ_AF_INET or PJ_AF_INET6) then return whatever
+ // remote address we have been told, heck, it could be blank!
+ return mImpl->mSinkStateItem->mRemoteAddress;
+ }
+
+ char tmp_addr[PJ_INET6_ADDRSTRLEN];
+ return pj_sockaddr_print(&transportInfo.src_rtp_name, tmp_addr, sizeof(tmp_addr), 0);
}
/**
@@ -222,8 +231,13 @@ Ice::Int StreamSinkRTPImpl::getRemotePort(const Ice::Current&)
pjmedia_transport_info_init(&transportInfo);
pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
- int port = pj_ntohs(transportInfo.src_rtp_name.ipv4.sin_port);
- return (port != 0) ? port : mImpl->mSinkStateItem->mRemotePort;
+ if (transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET &&
+ transportInfo.src_rtp_name.addr.sa_family != PJ_AF_INET6)
+ {
+ return mImpl->mSinkStateItem->mRemotePort;
+ }
+
+ return pj_sockaddr_get_port(&transportInfo.src_rtp_name);
}
/**
diff --git a/src/RTPSource.cpp b/src/RTPSource.cpp
index 6a8e1a7..3bc22c1 100644
--- a/src/RTPSource.cpp
+++ b/src/RTPSource.cpp
@@ -140,7 +140,8 @@ std::string StreamSourceRTPImpl::getLocalAddress(const Ice::Current&)
pjmedia_transport_info_init(&transportInfo);
pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
- return pj_inet_ntoa(transportInfo.sock_info.rtp_addr_name.ipv4.sin_addr);
+ char tmp_addr[PJ_INET6_ADDRSTRLEN];
+ return pj_sockaddr_print(&transportInfo.sock_info.rtp_addr_name, tmp_addr, sizeof(tmp_addr), 0);
}
/**
@@ -153,7 +154,7 @@ Ice::Int StreamSourceRTPImpl::getLocalPort(const Ice::Current&)
pjmedia_transport_info_init(&transportInfo);
pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
- return pj_ntohs(transportInfo.sock_info.rtp_addr_name.ipv4.sin_port);
+ return pj_sockaddr_get_port(&transportInfo.sock_info.rtp_addr_name);
}
/**
@@ -232,24 +233,42 @@ static void receiveRTP(void *userdata, void *packet, pj_ssize_t size)
*/
void StreamSourceRTPImpl::setRemoteDetails(const string& address, Ice::Int port)
{
- pj_sockaddr_in sin;
+ pj_sockaddr addr;
/* This feels so dirty but convert from our std::string to a pj_str, since their API requires it. */
pj_str_t tmpAddress;
pj_strset(&tmpAddress, (char*)address.c_str(), address.size());
- /* Now for the next trick - convert into a pj_sockaddr_in so we can pass it to pjmedia_transport_attach */
- pj_sockaddr_in_init(&sin, &tmpAddress, (pj_uint16_t) port);
+ /* Now for the next trick - convert into a pj_sockaddr so we can pass it to pjmedia_transport_attach */
+ pj_status_t status = pj_sockaddr_parse(pj_AF_UNSPEC(), 0, &tmpAddress, &addr);
+
+ if (status != PJ_SUCCESS)
+ {
+ throw InvalidAddress();
+ }
+
+ // Confirm that the address family of the address matches that of this RTP session
+ pjmedia_transport_info transportInfo;
+
+ pjmedia_transport_info_init(&transportInfo);
+ pjmedia_transport_get_info(mImpl->mSession->getTransport(), &transportInfo);
+
+ if (transportInfo.sock_info.rtp_addr_name.addr.sa_family != addr.addr.sa_family)
+ {
+ throw InvalidAddress();
+ }
+
+ pj_sockaddr_set_port(&addr, static_cast<pj_uint16_t>(port));
/* In case we were already attached go ahead and detach */
pjmedia_transport_detach(mImpl->mSession->getTransport(), this);
/* All ready... actually do it! */
- pj_status_t status = pjmedia_transport_attach(mImpl->mSession->getTransport(), this, &sin, NULL, sizeof(pj_sockaddr_in), &receiveRTP, NULL);
+ status = pjmedia_transport_attach(mImpl->mSession->getTransport(), this, &addr, NULL, pj_sockaddr_get_len(&addr), &receiveRTP, NULL);
if (status != PJ_SUCCESS)
{
- /* TODO: Decide what to do if this occurs, do we need an exception? */
+ throw InvalidAddress();
}
}
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index fd5694c..2bfab90 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -81,6 +81,7 @@ public:
void visitRtpGeneralStateItem(const RtpGeneralStateItemPtr &item)
{
mImpl->mGeneralState->mServiceManagement = item->mServiceManagement;
+ mImpl->mGeneralState->mComparatorId = item->mComparatorId;
}
void visitRtpSessionStateItem(const RtpSessionStateItemPtr &item)
@@ -96,7 +97,7 @@ public:
RTPSessionImplPtr localSession =
new RTPSessionImpl(mImpl->mAdapter, mImpl->mPoolFactory, item->mSessionIdentity,
- item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats);
+ item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats, item->mIPv6);
localitem->setSession(localSession);
}
else
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index c82fb29..f65d5b2 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -9,7 +9,16 @@ asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test unit_test_fram
asterisk_scf_component_build_icebox(media_rtp_pjmedia_test)
target_link_libraries(media_rtp_pjmedia_test asterisk-scf-api)
+add_definitions(-DIPV6_TEST)
+asterisk_scf_component_init(media_rtp_pjmedia_test_v6)
+asterisk_scf_component_add_file(media_rtp_pjmedia_test_v6 TestRTPpjmedia.cpp)
+asterisk_scf_component_add_slice(media_rtp_pjmedia_test_v6 ../local-slice/RtpStateReplicationIf.ice)
+asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia_test_v6 unit_test_framework thread date_time)
+asterisk_scf_component_build_icebox(media_rtp_pjmedia_test_v6)
+target_link_libraries(media_rtp_pjmedia_test_v6 asterisk-scf-api)
+
# integration test
if(integrated_build STREQUAL "true")
asterisk_scf_test_icebox(media_rtp_pjmedia_test config/test_component.config)
+ asterisk_scf_test_icebox(media_rtp_pjmedia_test_v6 config/test_component.config)
endif()
diff --git a/test/TestRTPpjmedia.cpp b/test/TestRTPpjmedia.cpp
index a13f157..810963e 100644
--- a/test/TestRTPpjmedia.cpp
+++ b/test/TestRTPpjmedia.cpp
@@ -13,7 +13,11 @@
* the GNU General Public License Version 2. See the LICENSE.txt file
* at the top of the source tree.
*/
+#ifdef IPV6_TEST
+#define BOOST_TEST_MODULE RTPpjmediaTestSuitev6
+#else
#define BOOST_TEST_MODULE RTPpjmediaTestSuite
+#endif
#define BOOST_TEST_NO_MAIN
#include <boost/test/unit_test.hpp>
@@ -313,7 +317,7 @@ BOOST_AUTO_TEST_CASE(ServiceFoundUsingName)
bool found = false;
try {
- ServiceLocatorParamsPtr params = new ServiceLocatorParams();
+ RTPServiceLocatorParamsPtr params = new RTPServiceLocatorParams();
params->category = "rtp";
Testbed.locator->locate(params);
@@ -396,6 +400,8 @@ BOOST_AUTO_TEST_CASE(CheckReplicatedGeneralStateItem)
}
BOOST_CHECK(Testbed.mListener->mGeneral);
+ BOOST_CHECK(Testbed.mListener->mGeneral->mServiceManagement);
+ BOOST_CHECK(Testbed.mListener->mGeneral->mComparatorId.size());
}
/**
@@ -407,8 +413,11 @@ BOOST_AUTO_TEST_CASE(AllocateRTPSession)
try
{
- ServiceLocatorParamsPtr params = new ServiceLocatorParams();
+ RTPServiceLocatorParamsPtr params = new RTPServiceLocatorParams();
params->category = "rtp";
+#ifdef IPV6_TEST
+ params->ipv6 = true;
+#endif
RTPMediaServicePrx service = RTPMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
@@ -422,7 +431,7 @@ BOOST_AUTO_TEST_CASE(AllocateRTPSession)
// You might think "geez, this should deadlock due to state replication" but no, we use one ways for that
boost::mutex::scoped_lock lock(Testbed.mLock);
- Testbed.session = service->allocate(formats);
+ Testbed.session = service->allocate(params);
// Give the RTP component time to replicate this session
Testbed.mCondition.wait(lock);
@@ -449,6 +458,12 @@ BOOST_AUTO_TEST_CASE(ConfirmInitialReplicatedRTPSession)
BOOST_CHECK(Testbed.mListener->mSession);
BOOST_CHECK(Testbed.mListener->mSession->mSessionIdentity == Testbed.session->ice_getIdentity());
+#ifdef IPV6_TEST
+ BOOST_CHECK(Testbed.mListener->mSession->mIPv6 == true);
+#else
+ BOOST_CHECK(Testbed.mListener->mSession->mIPv6 == false);
+#endif
+
StreamSinkSeq sinks = Testbed.session->getSinks();
StreamSinkRTPPrx sink = StreamSinkRTPPrx::uncheckedCast(sinks.front());
BOOST_CHECK(Testbed.mListener->mSession->mSinkIdentity == sink->ice_getIdentity());
@@ -521,8 +536,15 @@ BOOST_AUTO_TEST_CASE(VerifyLocalAddressonSources)
for (StreamSourceSeq::const_iterator i = sources.begin(); i != sources.end(); ++i)
{
StreamSourceRTPPrx source = StreamSourceRTPPrx::checkedCast((*i));
+ std::string address = source->getLocalAddress();
+
+#ifdef IPV6_TEST
+ std::string valid_char = ":";
+#else
+ std::string valid_char = ".";
+#endif
- if (source->getLocalAddress().empty() || !source->getLocalPort())
+ if (address.empty() || !source->getLocalPort() || address.find(valid_char) == string::npos)
{
validaddresses = false;
}
@@ -669,9 +691,14 @@ BOOST_AUTO_TEST_CASE(ConfirmRemoteAddressSetting)
{
StreamSinkRTPPrx sink = StreamSinkRTPPrx::checkedCast((*i));
- sink->setRemoteDetails("127.0.0.1", 10000);
+#ifdef IPV6_TEST
+ std::string address = "::1";
+#else
+ std::string address = "127.0.0.1";
+#endif
+ sink->setRemoteDetails(address, 10000);
- if (sink->getRemoteAddress() == "127.0.0.1" && sink->getRemotePort() == 10000)
+ if (sink->getRemoteAddress() == address && sink->getRemotePort() == 10000)
{
set = true;
}
@@ -952,8 +979,11 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
try
{
- ServiceLocatorParamsPtr params = new ServiceLocatorParams();
+ RTPServiceLocatorParamsPtr params = new RTPServiceLocatorParams();
params->category = "rtp";
+#ifdef IPV6_TEST
+ params->ipv6 = true;
+#endif
RTPMediaServicePrx service = RTPMediaServicePrx::uncheckedCast(Testbed.locator->locate(params));
@@ -964,7 +994,7 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
FormatSeq formats;
formats.push_back(format);
- RTPSessionPrx session = service->allocate(formats);
+ RTPSessionPrx session = service->allocate(params);
PayloadMap mapping;
mapping.insert(make_pair(13, format));
@@ -1017,6 +1047,46 @@ BOOST_AUTO_TEST_CASE(ReceiveUnknownRTPPacket)
}
/**
+ * Attempt to set an IPv4 address on an IPv6 only sink OR try to set an IPv6 address
+ * on an IPv4 only sink.
+ */
+BOOST_AUTO_TEST_CASE(SetInvalidAddressFamilyAddress)
+{
+ bool set = false;
+
+ try
+ {
+ StreamSinkSeq sinks = Testbed.session->getSinks();
+
+ for (StreamSinkSeq::const_iterator i = sinks.begin(); i != sinks.end(); ++i)
+ {
+ StreamSinkRTPPrx sink = StreamSinkRTPPrx::checkedCast((*i));
+
+ /* No, these are not accidentally reversed. We want to try to set an IPv4 address
+ * on an IPv6 sink and vice versa to make sure that an exception does get thrown.
+ */
+#ifdef IPV6_TEST
+ sink->setRemoteDetails("127.0.0.1", 10000);
+#else
+ sink->setRemoteDetails("::1", 10000);
+#endif
+
+ set = true;
+ }
+ }
+ catch (const Ice::Exception &e)
+ {
+ BOOST_TEST_MESSAGE(e.ice_name());
+ BOOST_TEST_MESSAGE(e.what());
+ }
+ catch (...)
+ {
+ }
+
+ BOOST_CHECK(!set);
+}
+
+/**
* Attempt to release our RTP session
*/
BOOST_AUTO_TEST_CASE(ReleaseRTPSession)
-----------------------------------------------------------------------
--
asterisk-scf/integration/media_rtp_pjmedia.git
More information about the asterisk-scf-commits
mailing list