[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "limit_thread_pool" created.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Jul 26 19:13:45 CDT 2012
branch "limit_thread_pool" has been created
at a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad (commit)
- Log -----------------------------------------------------------------
commit a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad
Author: Ken Hunt <ken.hunt at digium.com>
Date: Thu Jul 26 19:12:43 2012 -0500
Limit the growth of the thread pool.
diff --git a/config/test_sip.conf b/config/test_sip.conf
index 3254e57..2a80127 100644
--- a/config/test_sip.conf
+++ b/config/test_sip.conf
@@ -27,6 +27,9 @@ SIPSessionGateway.SIP.RoutingDestinationId=pjsip
# PJSIP Modules to register
SIPSessionGateway.SIP.Modules=Session
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.DefaultSize=20
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.MaxSize=30
+
# The service name of the State replicator to use
SIPSessionGateway.SIP.StateReplicatorService=default
diff --git a/src/Component.cpp b/src/Component.cpp
index c38816d..3a5c6ca 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -385,6 +385,7 @@ void Component::registerPJSIPModules()
boost::static_pointer_cast<SIPReplicationContext>(getReplicationContext());
Ice::PropertiesPtr props = getCommunicator()->getProperties();
+
Ice::StringSeq moduleNames = props->getPropertyAsList(getName() + ".SIP.Modules");
for (Ice::StringSeq::iterator i = moduleNames.begin();
i != moduleNames.end();
@@ -399,7 +400,11 @@ void Component::registerPJSIPModules()
//
if ((*i) == "Session")
{
- mPJSIPManager->registerSessionModule(mEndpointFactory,
+ Ice::PropertyDict sessionModuleProps = props->getPropertiesForPrefix(getName() + ".SIP.Modules.Session");
+
+ mPJSIPManager->registerSessionModule(
+ sessionModuleProps,
+ mEndpointFactory,
mSessionRouter,
getServiceLocator(),
sipReplicationContext,
diff --git a/src/PJSIPManager.cpp b/src/PJSIPManager.cpp
index 10fa36b..9a0f31f 100644
--- a/src/PJSIPManager.cpp
+++ b/src/PJSIPManager.cpp
@@ -128,7 +128,9 @@ PJSIPManager::~PJSIPManager()
}
-void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+void PJSIPManager::registerSessionModule(
+ const Ice::PropertyDict& sessionModuleProps,
+ const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
const AsteriskSCF::Discovery::SmartProxy<
AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -137,7 +139,7 @@ void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFact
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx& serviceLocatorManagement
)
{
- mSessionModule = new PJSIPSessionModule(mEndpoint, endpointFactoryPtr, sessionRouter,
+ mSessionModule = new PJSIPSessionModule(sessionModuleProps, mEndpoint, endpointFactoryPtr, sessionRouter,
serviceLocator, replicationContext, adapter, serviceLocatorManagement);
}
diff --git a/src/PJSIPManager.h b/src/PJSIPManager.h
index e5eecef..559f4ce 100644
--- a/src/PJSIPManager.h
+++ b/src/PJSIPManager.h
@@ -95,7 +95,9 @@ public:
* Register the PJSIPSessionModule, responsible
* for basic call handling
*/
- void registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+ void registerSessionModule(
+ const Ice::PropertyDict& sessionModuleProps,
+ const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
sessionRouter,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index bc009ae..07ac87c 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -29,6 +29,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/numeric/conversion/cast.hpp>
+#include <algorithm>
#include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
#include <AsteriskSCF/Core/Routing/RoutingIf.h>
@@ -2352,40 +2353,73 @@ void PJSIPSessionModule::enqueueSessionWork(const SuspendableWorkPtr& work, pjsi
}
}
-PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener()
- : mActiveThreads(0), mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")) { }
+PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize)
+ : mActiveThreads(0),
+ mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")),
+ mDefaultPoolSize(defaultPoolSize),
+ mMaxPoolSize(maxPoolSize)
+{
+}
+
+void PJSIPSessionModuleThreadPoolListener::initialized(const PoolPtr& pool)
+{
+ pool->setSize(mDefaultPoolSize);
+ mActiveThreads = mDefaultPoolSize;
+}
void PJSIPSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long)
{
//XXX Making this behavior more customizable would be nice
//
//For now, what we do is kill all idle threads.
- if (idle > 0)
+
+ int poolSize = active + idle;
+ int newPoolSize = poolSize;
+
+ // Should we consider downsizing the pool?
+ if (poolSize > mDefaultPoolSize)
{
- pool->setSize((int) active);
+ // Are there any idle threads to shut down?
+ if (idle > 0)
+ {
+ newPoolSize = std::max(mDefaultPoolSize, active);
+ if (newPoolSize != poolSize)
+ {
+ mActiveThreads = newPoolSize;
+ pool->setSize(newPoolSize);
+ }
+ }
}
- mActiveThreads = active;
}
-void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, bool)
+void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, Ice::Long newQueueSize)
{
//XXX Making this behavior more customizable would be nice
- //
- //For now, use one thread per work item.
- lg(Debug) << "Detected the addition of work to SIP's thread pool. Setting the size to " << mActiveThreads + numNewWork;
- int newSize = (int) (mActiveThreads + numNewWork);
- pool->setSize(newSize);
+ if (mActiveThreads < mMaxPoolSize)
+ {
+ if (newQueueSize > mDefaultPoolSize && newQueueSize < mActiveThreads)
+ {
+ int newSize = std::min(mMaxPoolSize, newQueueSize);
+ lg(Debug) << "SIP's thread pool queue size exceeds default threads. Setting pool size to " << newSize;
+ mActiveThreads = newSize;
+ pool->setSize(newSize);
+ }
+ }
}
void PJSIPSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
{
//XXX Making this behavior more customizable would be nice
//
- //For now, kill off everything
- lg(Debug) << "The queue is empty so we're killing all the threads";
- pool->setSize(0);
+ if (mActiveThreads > mDefaultPoolSize)
+ {
+ lg(Debug) << "The queue is empty so we're dropping thread pool back to default size.";
+
+ mActiveThreads = mDefaultPoolSize;
+ pool->setSize(mDefaultPoolSize);
+ }
}
void PJSIPSessionModuleThreadPoolListener::threadStart()
@@ -2419,12 +2453,14 @@ SessionWork::SessionWork(const QueuePtr& queue)
: mThreadPoolQueue(queue),
mInternalQueue(new SuspendableWorkQueue(this)) { }
-void SessionWork::workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+void SessionWork::workAdded(const QueueBasePtr&, Ice::Long numWorkAdded, Ice::Long newQueueSize)
{
- if (wasEmpty)
+ // Was this WorkQueue empty?
+ if (numWorkAdded == newQueueSize)
{
try
{
+ // We need to be put back into the Pool's queue to be assigned to a worker thread.
mThreadPoolQueue->enqueueWork(this);
}
catch (const ShuttingDown&)
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index d08b475..1688609 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -81,15 +81,18 @@ private:
class PJSIPSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
{
public:
- PJSIPSessionModuleThreadPoolListener();
+ PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize);
+ void initialized(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long zombie);
- void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
+ void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, Ice::Long newQueueSize);
void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
void threadStart();
void threadStop();
private:
Ice::Long mActiveThreads;
AsteriskSCF::PJLIB::ThreadHook *mPJLIBHook;
+ Ice::Long mDefaultPoolSize;
+ Ice::Long mMaxPoolSize;
};
typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModuleThreadPoolListenerPtr;
@@ -127,7 +130,10 @@ typedef std::vector<ReasonId> ReasonIdSeq;
class PJSIPSessionModule : public PJSIPModule
{
public:
- PJSIPSessionModule(pjsip_endpoint *endpt, const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+ PJSIPSessionModule(
+ const Ice::PropertyDict& sessionModuleProps,
+ pjsip_endpoint *endpt,
+ const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
sessionRouter,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModuleConstruction.cpp b/src/PJSIPSessionModuleConstruction.cpp
index f5fe146..c627778 100644
--- a/src/PJSIPSessionModuleConstruction.cpp
+++ b/src/PJSIPSessionModuleConstruction.cpp
@@ -14,6 +14,8 @@
* at the top of the source tree.
*/
+#include <boost/algorithm/string/predicate.hpp>
+
#include <AsteriskSCF/Logger.h>
#include <AsteriskSCF/WorkQueue/WorkQueue.h>
#include <AsteriskSCF/ThreadPool/ThreadPool.h>
@@ -130,7 +132,25 @@ static void invOnSendReinviteResponse(pjsip_inv_session *inv, pjsip_tx_data *tda
return sessionModule->invOnSendReinviteResponse(inv, tdata);
}
-PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
+int getIntValue(const std::string& propertyName, const std::string& propertyValue, int defaultValue)
+{
+ int value = defaultValue;
+ try
+ {
+ value = boost::lexical_cast<int>(propertyValue);
+ }
+ catch(boost::bad_lexical_cast const&)
+ {
+ lg(Error) << "Invalid value for " << propertyName << " specified. Using default of " << defaultValue;
+ return defaultValue;
+ }
+
+ return value;
+}
+
+PJSIPSessionModule::PJSIPSessionModule(
+ const Ice::PropertyDict& sessionModuleProps,
+ pjsip_endpoint *endpt,
const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -142,6 +162,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
mReplicationContext(replicationContext), mEndpoint(endpt),
mSessionCreationExtensionPoint(new SIPSessionCreationExtensionPoint()), mAdapter(adapter)
{
+
sessionModule = this;
mModule.name = pj_str(moduleName);
mModule.priority = PJSIP_MOD_PRIORITY_APPLICATION;
@@ -163,6 +184,22 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
mSessionCreationExtensionPointPrx,
SessionCreationExtensionPointId);
+ int defaultThreadPoolSize=10;
+ int maxThreadPoolSize=15;
+
+ Ice::PropertyDict::const_iterator prop;
+ for (prop = sessionModuleProps.begin(); prop != sessionModuleProps.end(); prop++)
+ {
+ if (boost::algorithm::ends_with((*prop).first, "ThreadPool.DefaultSize"))
+ {
+ defaultThreadPoolSize = getIntValue((*prop).first, (*prop).second, defaultThreadPoolSize);
+ }
+ else if (boost::algorithm::ends_with((*prop).first, "ThreadPool.MaxSize"))
+ {
+ maxThreadPoolSize = getIntValue((*prop).first, (*prop).second, maxThreadPoolSize);
+ }
+ }
+
// TBD... how to access the Component's service and instance ids.
mSessionCreationExtensionPointService->addLocatorParams(
AsteriskSCF::Operations::createContext(),
@@ -170,7 +207,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
"");
mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
- mPoolListener = new PJSIPSessionModuleThreadPoolListener();
+ mPoolListener = new PJSIPSessionModuleThreadPoolListener(defaultThreadPoolSize, maxThreadPoolSize);
ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
mPool = factory->createPool(mPoolListener, mPoolQueue);
diff --git a/src/SIPSession.h b/src/SIPSession.h
index e4aa05a..0464b3b 100644
--- a/src/SIPSession.h
+++ b/src/SIPSession.h
@@ -77,7 +77,7 @@ public:
* Overrides of QueueListener interface
*/
SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
- void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, bool wasEmpty);
+ void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, Ice::Long newQueueSize);
void workResumable(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
void emptied(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
void shuttingDown(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
-----------------------------------------------------------------------
--
asterisk-scf/integration/sip.git
More information about the asterisk-scf-commits
mailing list