[asterisk-scf-commits] asterisk-scf/integration/media_rtp_pjmedia.git branch "configuration" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue May 3 09:21:53 CDT 2011


branch "configuration" has been updated
       via  eed17133034e54adb065166ad1795a62c2d73f41 (commit)
      from  1467fd724497c5b072d3fafdc03b9d52bb9cb731 (commit)

Summary of changes:
 local-slice/RtpConfigurationIf.ice |    6 +
 src/CMakeLists.txt                 |    2 +
 src/MediaRTPpjmedia.cpp            |   32 +++-
 src/RTPConfiguration.cpp           |  381 ++++++++++++++++++++++++++++++++++++
 src/RTPConfiguration.h             |   64 ++++++
 src/RTPSession.cpp                 |   24 ++-
 src/RTPSession.h                   |   11 +-
 src/RtpStateReplicator.h           |    4 +-
 src/RtpStateReplicatorListener.cpp |   15 +-
 9 files changed, 515 insertions(+), 24 deletions(-)
 create mode 100644 src/RTPConfiguration.cpp
 create mode 100644 src/RTPConfiguration.h


- Log -----------------------------------------------------------------
commit eed17133034e54adb065166ad1795a62c2d73f41
Author: Joshua Colp <jcolp at digium.com>
Date:   Tue May 3 11:21:39 2011 -0300

    Add support for receiving configuration using the configuration interface.

diff --git a/local-slice/RtpConfigurationIf.ice b/local-slice/RtpConfigurationIf.ice
index 62d3f12..957bb99 100644
--- a/local-slice/RtpConfigurationIf.ice
+++ b/local-slice/RtpConfigurationIf.ice
@@ -67,6 +67,9 @@ module V1
 
    /**
     * Port ranges configuartion item
+    *
+    * This must be added to the general configuration group using the name 'ports'
+    *
     */
    class PortRangesItem extends RtpConfigurationItem
    {
@@ -89,6 +92,9 @@ module V1
 
    /**
     * Worker thread count for incoming media configuration item
+    *
+    * This must be added to the general configuration group using the name 'workerThreadCount'
+    *
     */
    class WorkerThreadCountItem extends RtpConfigurationItem
    {
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 75a557e..175d1b2 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -18,6 +18,8 @@ asterisk_scf_component_add_file(media_rtp_pjmedia RTPSource.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia RTPSink.h)
 asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicatorListener.cpp)
 asterisk_scf_component_add_file(media_rtp_pjmedia RtpStateReplicator.h)
+asterisk_scf_component_add_file(media_rtp_pjmedia RTPConfiguration.cpp)
+asterisk_scf_component_add_file(media_rtp_pjmedia RTPConfiguration.h)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpStateReplicationIf.ice)
 asterisk_scf_component_add_slice(media_rtp_pjmedia ../local-slice/RtpConfigurationIf.ice)
 asterisk_scf_component_add_boost_libraries(media_rtp_pjmedia core thread)
diff --git a/src/MediaRTPpjmedia.cpp b/src/MediaRTPpjmedia.cpp
index 3c1343e..c21e9d5 100644
--- a/src/MediaRTPpjmedia.cpp
+++ b/src/MediaRTPpjmedia.cpp
@@ -36,6 +36,7 @@
 
 #include "RTPSession.h"
 #include "RtpStateReplicator.h"
+#include "RTPConfiguration.h"
 
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
@@ -52,6 +53,7 @@ Logger lg = getLoggerFactory().getLogger("AsteriskSCF.MediaRTP");
 
 static const string ReplicaServiceId("MediaRtpReplica");
 static const string MediaServiceId("RTPMediaService");
+static const string ConfigurationServiceId("RTPConfigurationService");
 
 /**
  * Implementation of the RTPMediaService interface as defined in MediaRTPIf.ice
@@ -60,7 +62,8 @@ class RTPMediaServiceImpl : public RTPMediaService
 {
 public:
     RTPMediaServiceImpl(const Ice::ObjectAdapterPtr&, const ReplicaPtr&,
-            const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>&);
+	const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>&,
+	const ConfigurationServiceImplPtr&);
     RTPSessionPrx allocate(const FormatSeq&, const Ice::Current&);
     pj_pool_factory *getPoolFactory() { return &mCachingPool.factory; };
 private:
@@ -85,6 +88,11 @@ private:
     ReplicaPtr mReplicaService;
 
     /**
+     * A pointer to the configuration service.
+     */
+    ConfigurationServiceImplPtr mConfigurationService;
+
+    /**
      * A proxy to the state replicator.
      */
     AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx> mStateReplicator;
@@ -202,6 +210,11 @@ private:
     ReplicaPtr mReplicaService;
 
     /**
+     * Instance of our configuration service implementation.
+     */
+    ConfigurationServiceImplPtr mConfigurationService;
+
+    /**
      * Instance of our state replicator listener.
      */
     RtpStateReplicatorListenerPtr mReplicatorListener;
@@ -328,8 +341,10 @@ private:
  * Constructor for the RTPMediaServiceImpl class.
  */
 RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, const ReplicaPtr& replicaService,
-        const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
-    mAdapter(adapter), mReplicaService(replicaService), mStateReplicator(stateReplicator)
+    const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+    const ConfigurationServiceImplPtr& configurationService) :
+    mAdapter(adapter), mReplicaService(replicaService), mConfigurationService(configurationService),
+    mStateReplicator(stateReplicator)
 {
     /* Initialize the memory caching pool using default policy as specified by pjlib. */
     pj_caching_pool_init(&mCachingPool, &pj_pool_factory_default_policy, 0);
@@ -344,7 +359,8 @@ RTPMediaServiceImpl::RTPMediaServiceImpl(const Ice::ObjectAdapterPtr& adapter, c
 RTPSessionPrx RTPMediaServiceImpl::allocate(const FormatSeq& formats, const Ice::Current&)
 {
     RTPSessionImplPtr session =
-        new RTPSessionImpl(mAdapter, formats, &mCachingPool.factory, mReplicaService, mStateReplicator);
+        new RTPSessionImpl(mAdapter, formats, &mCachingPool.factory, mReplicaService, mStateReplicator,
+	    mConfigurationService);
     return session->getProxy();
 }
 
@@ -388,6 +404,9 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     mReplicaService = new ReplicaImpl(mLocalAdapter);
     mLocalAdapter->add(mReplicaService, mCommunicator->stringToIdentity(ReplicaServiceId));
 
+    mConfigurationService = new ConfigurationServiceImpl();
+    mLocalAdapter->add(mConfigurationService, mCommunicator->stringToIdentity(ConfigurationServiceId));
+
     mLocalAdapter->activate();
 
     mGlobalAdapter = mCommunicator->createObjectAdapter("MediaRTPpjmediaAdapter");
@@ -419,12 +438,13 @@ void MediaRTPpjmediaApp::start(const std::string&, const Ice::CommunicatorPtr& c
     }
 
     RTPMediaServiceImplPtr rtpmediaservice =
-        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator);
+        new RTPMediaServiceImpl(mGlobalAdapter, mReplicaService, mStateReplicator, mConfigurationService);
 
     if (mStateReplicator)
     {
         mReplicatorListener =
-            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState);
+            new RtpStateReplicatorListenerI(mGlobalAdapter, rtpmediaservice->getPoolFactory(), mGeneralState,
+		mConfigurationService);
         mReplicatorListenerProxy =
             RtpStateReplicatorListenerPrx::uncheckedCast(mLocalAdapter->addWithUUID(mReplicatorListener));
 
diff --git a/src/RTPConfiguration.cpp b/src/RTPConfiguration.cpp
new file mode 100644
index 0000000..5c65808
--- /dev/null
+++ b/src/RTPConfiguration.cpp
@@ -0,0 +1,381 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <IceUtil/UUID.h>
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+
+#include "RtpConfigurationIf.h"
+#include "RTPConfiguration.h"
+
+using namespace AsteriskSCF::System::Configuration::V1;
+using namespace AsteriskSCF::Media::RTP::V1;
+
+class ConfigurationServiceImplPriv
+{
+public:
+    /**
+     * Constructor for this private class
+     */
+    ConfigurationServiceImplPriv();
+        
+    /**
+     * General RTP configuration
+     */
+    RtpGeneralGroupPtr mGeneralGroup;
+};
+
+ConfigurationServiceImpl::ConfigurationServiceImpl() : mImplPriv(new ConfigurationServiceImplPriv())
+{
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+    class visitor : public RtpConfigurationGroupVisitor
+    {
+    public:
+	visitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) : mImplPriv(implPriv), mGroups(visitorGroups) { };
+
+    private:
+	/**
+	 * Internal helper function which determines what configuration items should be returned
+	 */
+	void insertRequestedConfigurationItems(ConfigurationItemDict& requestedItems,
+	    ConfigurationItemDict& localItems,
+	    ConfigurationItemDict& returnedItems)
+	{
+	    
+	    for (ConfigurationItemDict::iterator requestedItem = requestedItems.begin();
+		 requestedItem != requestedItems.end();
+		 ++requestedItem)
+	    {
+		ConfigurationItemDict::iterator localItem = localItems.find((*requestedItem).first);
+
+		if (localItem == localItems.end())
+		{
+		    continue;
+		}
+
+		returnedItems.insert(make_pair((*requestedItem).first, (*requestedItem).second));
+	    }
+	}
+
+	void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+	{
+	    if (!mImplPriv->mGeneralGroup)
+	    {
+		return;
+	    }
+
+	    RtpGeneralGroupPtr returnedGroup = new RtpGeneralGroup();
+
+	    insertRequestedConfigurationItems(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems, returnedGroup->configurationItems);
+
+	    mGroups.push_back(returnedGroup);
+	};
+
+	boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+	ConfigurationGroupSeq& mGroups;
+    };
+    
+    ConfigurationGroupSeq newGroups;
+    RtpConfigurationGroupVisitorPtr v = new visitor(mImplPriv, newGroups);
+    
+    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+    {
+	(*group)->visit(v);
+    }
+    
+    return newGroups;
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+    class visitor : public RtpConfigurationGroupVisitor
+    {
+    public:
+	visitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv, ConfigurationGroupSeq& visitorGroups) :
+	    mImplPriv(implPriv), mGroups(visitorGroups) { };
+	
+    private:
+	void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
+	{
+	    if (!mImplPriv->mGeneralGroup)
+	    {
+		return;
+	    }
+	    
+	    mGroups.push_back(mImplPriv->mGeneralGroup);
+	};
+
+	boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+	ConfigurationGroupSeq& mGroups;
+    };
+    
+    ConfigurationGroupSeq newGroups;
+    RtpConfigurationGroupVisitorPtr v = new visitor(mImplPriv, newGroups);
+    
+    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+    {
+	(*group)->visit(v);
+    }
+    
+    return newGroups;
+}
+
+ConfigurationGroupSeq ConfigurationServiceImpl::getConfigurationGroups(const Ice::Current&)
+{
+    ConfigurationGroupSeq groups;
+    
+    if (mImplPriv->mGeneralGroup)
+    {
+	RtpGeneralGroupPtr general = new RtpGeneralGroup();
+	groups.push_back(general);
+    }
+    
+    return groups;
+}
+
+void ConfigurationServiceImpl::setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+    class groupsVisitor : public RtpConfigurationGroupVisitor
+    {
+    public:
+	groupsVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+	
+    private:
+	/**
+	 * Helper function which performs serial number checking of items
+	 */
+	void performSerialCheck(ConfigurationItemDict& changedItems, ConfigurationItemDict& localItems)
+	{
+	    for (ConfigurationItemDict::iterator item = changedItems.begin();
+		 item != changedItems.end();
+		 ++item)
+	    {
+		// If serial checking is to be skipped for this item just skip over it
+		if ((*item).second->serialNumber == -1)
+		{
+		    continue;
+		}
+		
+		ConfigurationItemDict::iterator localItem = localItems.find((*item).first);
+		
+		if (localItem == localItems.end())
+		{
+		    // This is a new item so serial checking does not apply
+		    continue;
+		}
+		
+		if ((*item).second->serialNumber < (*localItem).second->serialNumber)
+		{
+		    /* XXX Need to throw the exception */
+		}
+	    }
+	}
+	
+	void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+	{
+	    if (!mImplPriv->mGeneralGroup)
+	    {
+		mImplPriv->mGeneralGroup = new RtpGeneralGroup();
+	    }
+	    else
+	    {
+		performSerialCheck(group->configurationItems, mImplPriv->mGeneralGroup->configurationItems);
+	    }
+	    
+	    for (ConfigurationItemDict::const_iterator item = group->configurationItems.begin();
+		 item != group->configurationItems.end();
+		 ++item)
+	    {
+		mImplPriv->mGeneralGroup->configurationItems.insert(make_pair((*item).first, (*item).second));
+	    }
+	}
+	
+	boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+    };
+    
+    RtpConfigurationGroupVisitorPtr v = new groupsVisitor(mImplPriv);
+    
+    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+    {
+	(*group)->visit(v);
+    }
+}
+
+void ConfigurationServiceImpl::removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+    class groupsVisitor : public RtpConfigurationGroupVisitor
+    {
+    public:
+	groupsVisitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+	
+	void removeItems(RtpConfigurationItemVisitor* visitor, ConfigurationItemDict& itemsToRemove,
+	    ConfigurationItemDict& localItems)
+	{
+	    for (ConfigurationItemDict::const_iterator item = itemsToRemove.begin();
+		 item != itemsToRemove.end();
+		 ++item)
+	    {
+		ConfigurationItemDict::iterator localItem = localItems.find((*item).first);
+		if (localItem ==  localItems.end())
+		{
+		    continue;
+		}
+		if (visitor != 0)
+		{
+		    (*item).second->visit(visitor);
+		}
+		localItems.erase(localItem);
+	    }
+	}
+	
+	void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr& group)
+	{
+	    if (!mImplPriv->mGeneralGroup)
+	    {
+		return;
+	    }
+
+	    removeItems(0, group->configurationItems, mImplPriv->mGeneralGroup->configurationItems);
+	};
+	
+    private:
+	boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+    };
+    
+    RtpConfigurationGroupVisitorPtr v = new groupsVisitor(mImplPriv);
+    
+    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+    {
+	(*group)->visit(v);
+    }
+}
+
+void ConfigurationServiceImpl::removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq& groups, const Ice::Current&)
+{
+    class visitor : public RtpConfigurationGroupVisitor
+    {
+    public:
+	visitor(boost::shared_ptr<ConfigurationServiceImplPriv> implPriv) : mImplPriv(implPriv) { };
+	
+    private:
+	void visitRtpGeneralGroup(const ::AsteriskSCF::Media::RTP::V1::RtpGeneralGroupPtr&)
+	{
+	    if (!mImplPriv->mGeneralGroup)
+	    {
+		return;
+	    }
+	    
+	    /* XXX What should we do as a result of this going away? */
+	    mImplPriv->mGeneralGroup = 0;
+	};
+
+	boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+    };
+    
+    RtpConfigurationGroupVisitorPtr v = new visitor(mImplPriv);
+    
+    for (ConfigurationGroupSeq::const_iterator group = groups.begin(); group != groups.end(); ++group)
+    {
+	(*group)->visit(v);
+    }
+}
+
+/**
+ * Internal function which returns configured start port.
+ */
+int ConfigurationServiceImpl::getStartPort()
+{
+    if (!mImplPriv->mGeneralGroup)
+    {
+	return 10000;
+    }
+
+    ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find("ports");
+
+    if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+    {
+	return 10000;
+    }
+
+    PortRangesItemPtr ports = PortRangesItemPtr::dynamicCast((*item).second);
+
+    if (!ports)
+    {
+	return 10000;
+    }
+
+    return ports->startPort;
+}
+
+/**
+ * Internal function which returns configured end port.
+ */
+int ConfigurationServiceImpl::getEndPort()
+{
+    if (!mImplPriv->mGeneralGroup)
+    {
+        return 20000;
+    }
+
+    ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find("ports");
+
+    if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+    {
+        return 20000;
+    }
+
+    PortRangesItemPtr ports = PortRangesItemPtr::dynamicCast((*item).second);
+
+    if (!ports)
+    {
+        return 20000;
+    }
+
+    return ports->endPort;
+}
+
+/**
+ * Internal function which returns configured worker thread count.
+ */
+int ConfigurationServiceImpl::getWorkerThreadCount()
+{
+    if (!mImplPriv->mGeneralGroup)
+    {
+        return 4;
+    }
+
+    ConfigurationItemDict::iterator item = mImplPriv->mGeneralGroup->configurationItems.find("workerThreadCount");
+
+    if (item == mImplPriv->mGeneralGroup->configurationItems.end())
+    {
+        return 4;
+    }
+
+    WorkerThreadCountItemPtr workerThreadCount = WorkerThreadCountItemPtr::dynamicCast((*item).second);
+
+    if (!workerThreadCount)
+    {
+        return 4;
+    }
+
+    return workerThreadCount->count;
+}
diff --git a/src/RTPConfiguration.h b/src/RTPConfiguration.h
new file mode 100644
index 0000000..e1e761c
--- /dev/null
+++ b/src/RTPConfiguration.h
@@ -0,0 +1,64 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#pragma once
+
+#include <Ice/Ice.h>
+
+#include <AsteriskSCF/System/Component/ConfigurationIf.h>
+
+#include "RtpConfigurationIf.h"
+
+/*
+ * Private implementation class for ConfigurationServiceImpl.
+ */
+class ConfigurationServiceImplPriv;
+
+/**
+ * Implementation of the configuration service.
+ */
+class ConfigurationServiceImpl : public AsteriskSCF::System::Configuration::V1::ConfigurationService
+{
+public:
+    ConfigurationServiceImpl();
+
+    /**
+     * Interface defined functions.
+     */
+    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfigurationAll(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq getConfigurationGroups(const Ice::Current&);
+    void setConfiguration(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    void removeConfigurationItems(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+    void removeConfigurationGroups(const AsteriskSCF::System::Configuration::V1::ConfigurationGroupSeq&, const Ice::Current&);
+
+    /**
+     * Internal functions.
+     */
+    int getStartPort();
+    int getEndPort();
+    int getWorkerThreadCount();
+private:
+    /**
+     * Private implementation details.
+     */
+    boost::shared_ptr<ConfigurationServiceImplPriv> mImplPriv;
+};
+
+/**
+ * A typedef which creates a smart pointer type for ConfigurationServiceImpl.
+ */
+typedef IceUtil::Handle<ConfigurationServiceImpl> ConfigurationServiceImplPtr;
diff --git a/src/RTPSession.cpp b/src/RTPSession.cpp
index 2ab6074..b97a32a 100644
--- a/src/RTPSession.cpp
+++ b/src/RTPSession.cpp
@@ -31,6 +31,8 @@
 #include "RTPSource.h"
 #include "RTPSink.h"
 
+#include "RTPConfiguration.h"
+
 using namespace std;
 using namespace AsteriskSCF::Core::Discovery::V1;
 using namespace AsteriskSCF::Media::V1;
@@ -55,8 +57,8 @@ class RTPSessionImplPriv
 {
 public:
     RTPSessionImplPriv(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
-            const ReplicaPtr& replicaService,
-            const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
+	const ReplicaPtr& replicaService,
+	const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) :
 	mAdapter(adapter), mFormats(formats),
         mSessionStateItem(new RtpSessionStateItem()),
         mReplicaService(replicaService), mStateReplicator(stateReplicator) { };
@@ -132,23 +134,27 @@ public:
  * Constructor for the RTPSessionImpl class (used by Ice).
  */
 RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const FormatSeq& formats,
-        pj_pool_factory* factory, const ReplicaPtr& replicaService,
-        const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator) : 
+    pj_pool_factory* factory, const ReplicaPtr& replicaService,
+    const AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>& stateReplicator,
+    const ConfigurationServiceImplPtr& configurationService) : 
     mImpl(new RTPSessionImplPriv(adapter, formats, replicaService, stateReplicator))
 {
     /* Add ourselves to the ICE ASM so we can be used. */
     mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->addWithUUID(this));
 
     /* Create an endpoint in pjmedia for our media. */
-    pj_status_t status = pjmedia_endpt_create(factory, NULL, 1, &mImpl->mEndpoint);
+    pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
 
     if (status != PJ_SUCCESS)
     {
         /* TODO: This is bad... we can't go on! */
     }
 
+    int minimumPort = configurationService->getStartPort();
+    int maximumPort = configurationService->getEndPort();
+
     /* Now create some transport we can use to actually send or receive the media. */
-    for (int port = DEFAULT_RTP_PORT_MINIMUM; port < DEFAULT_RTP_PORT_MAXIMUM; port += 2)
+    for (int port = minimumPort; port < maximumPort; port += 2)
     {
         if ((status = pjmedia_transport_udp_create2(mImpl->mEndpoint, "RTP", NULL, port, 0, &mImpl->mTransport)) ==
                 PJ_SUCCESS)
@@ -181,13 +187,13 @@ RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, const Forma
  * Constructor for the RTPSessionImpl class (used by state replicator).
  */
 RTPSessionImpl::RTPSessionImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory* factory,
-        const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
-        Ice::Int port, const FormatSeq& formats) :
+    const Ice::Identity& sessionIdentity, const Ice::Identity& sinkIdentity, const Ice::Identity& sourceIdentity,
+    Ice::Int port, const FormatSeq& formats, const ConfigurationServiceImplPtr& configurationService) :
     mImpl(new RTPSessionImplPriv(adapter, formats, 0, *(new AsteriskSCF::SmartProxy::SmartProxy<RtpStateReplicatorPrx>)))
 {
     mImpl->mProxy = RTPSessionPrx::uncheckedCast(adapter->add(this, sessionIdentity));
 
-    pj_status_t status = pjmedia_endpt_create(factory, NULL, 1, &mImpl->mEndpoint);
+    pj_status_t status = pjmedia_endpt_create(factory, NULL, configurationService->getWorkerThreadCount(), &mImpl->mEndpoint);
 
     if (status != PJ_SUCCESS)
     {
diff --git a/src/RTPSession.h b/src/RTPSession.h
index a8e4b7a..46d596d 100644
--- a/src/RTPSession.h
+++ b/src/RTPSession.h
@@ -10,6 +10,8 @@
 
 #include <boost/shared_ptr.hpp>
 
+#include "RTPConfiguration.h"
+
 /**
  * Forward definition for our private implementation of RTPSession.
  */
@@ -42,10 +44,13 @@ class RTPSessionImpl : public AsteriskSCF::Media::RTP::V1::RTPSession
 {
 public:
     RTPSessionImpl(const Ice::ObjectAdapterPtr&, const AsteriskSCF::Media::V1::FormatSeq&,
-            pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&, 
-            const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&);
+	pj_pool_factory*, const AsteriskSCF::System::Component::V1::ReplicaPtr&, 
+	const AsteriskSCF::SmartProxy::SmartProxy<AsteriskSCF::Media::RTP::V1::RtpStateReplicatorPrx>&,
+	const ConfigurationServiceImplPtr&
+	);
     RTPSessionImpl(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const Ice::Identity&, const Ice::Identity&,
-            const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&);
+	const Ice::Identity&, Ice::Int, const AsteriskSCF::Media::V1::FormatSeq&,
+	const ConfigurationServiceImplPtr&);
     AsteriskSCF::Media::V1::StreamSourceSeq getSources(const Ice::Current&);
     AsteriskSCF::Media::V1::StreamSinkSeq getSinks(const Ice::Current&);
     std::string getId(const Ice::Current&);
diff --git a/src/RtpStateReplicator.h b/src/RtpStateReplicator.h
index b2db101..3e04e0c 100644
--- a/src/RtpStateReplicator.h
+++ b/src/RtpStateReplicator.h
@@ -21,6 +21,7 @@
 #include <AsteriskSCF/StateReplicator.h>
 
 #include "RtpStateReplicationIf.h"
+#include "RTPConfiguration.h"
 
 using namespace AsteriskSCF::Media::RTP::V1;
 
@@ -31,7 +32,8 @@ typedef IceUtil::Handle<RtpStateReplicatorI> RtpStateReplicatorIPtr;
 class RtpStateReplicatorListenerI : public RtpStateReplicatorListener
 {
 public:
-    RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const RtpGeneralStateItemPtr&);
+    RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr&, pj_pool_factory*, const RtpGeneralStateItemPtr&,
+	const ConfigurationServiceImplPtr&);
     ~RtpStateReplicatorListenerI();
     void stateRemoved(const Ice::StringSeq&, const Ice::Current&);
     void stateSet(const RtpStateItemSeq&, const Ice::Current&);
diff --git a/src/RtpStateReplicatorListener.cpp b/src/RtpStateReplicatorListener.cpp
index a42c7b5..d610af8 100644
--- a/src/RtpStateReplicatorListener.cpp
+++ b/src/RtpStateReplicatorListener.cpp
@@ -57,8 +57,10 @@ struct RtpStateReplicatorListenerImpl
 {
 public:
     RtpStateReplicatorListenerImpl(const Ice::ObjectAdapterPtr& adapter, pj_pool_factory *poolFactory,
-            const RtpGeneralStateItemPtr& generalState)
-        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState) {}
+	const RtpGeneralStateItemPtr& generalState,
+	const ConfigurationServiceImplPtr& configurationService)
+        : mId(IceUtil::generateUUID()), mAdapter(adapter), mPoolFactory(poolFactory), mGeneralState(generalState),
+	  mConfigurationService(configurationService) {}
     
     void removeStateNoticeImpl(const Ice::StringSeq& itemKeys)
     {
@@ -98,7 +100,8 @@ public:
 
 		    RTPSessionImplPtr localSession =
                         new RTPSessionImpl(mImpl->mAdapter, mImpl->mPoolFactory, item->mSessionIdentity,
-                                item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats);
+			    item->mSinkIdentity, item->mSourceIdentity, item->mPort, item->mFormats,
+			    mImpl->mConfigurationService);
 		    localitem->setSession(localSession);
 		}
 		else
@@ -145,11 +148,13 @@ public:
     Ice::ObjectAdapterPtr mAdapter;
     pj_pool_factory *mPoolFactory;
     RtpGeneralStateItemPtr mGeneralState;
+    ConfigurationServiceImplPtr mConfigurationService;
 };
 
 RtpStateReplicatorListenerI::RtpStateReplicatorListenerI(const Ice::ObjectAdapterPtr& adapter,
-        pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState)
-    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState)) {}
+    pj_pool_factory *poolFactory, const RtpGeneralStateItemPtr& generalState,
+    const ConfigurationServiceImplPtr& configurationService)
+    : mImpl(new RtpStateReplicatorListenerImpl(adapter, poolFactory, generalState, configurationService)) {}
 
 RtpStateReplicatorListenerI::~RtpStateReplicatorListenerI()
 {

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/media_rtp_pjmedia.git



More information about the asterisk-scf-commits mailing list