[asterisk-scf-commits] asterisk-scf/integration/sip.git branch "limit_thread_pool" created.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Jul 26 19:13:45 CDT 2012


branch "limit_thread_pool" has been created
        at  a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad (commit)

- Log -----------------------------------------------------------------
commit a6c9a6b1d8e3e1219d4509e18d8300fa2ec38aad
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Thu Jul 26 19:12:43 2012 -0500

    Limit the growth of the thread pool.

diff --git a/config/test_sip.conf b/config/test_sip.conf
index 3254e57..2a80127 100644
--- a/config/test_sip.conf
+++ b/config/test_sip.conf
@@ -27,6 +27,9 @@ SIPSessionGateway.SIP.RoutingDestinationId=pjsip
 # PJSIP Modules to register
 SIPSessionGateway.SIP.Modules=Session
 
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.DefaultSize=20
+SIPSessionGateway.SIP.Modules.Session.ThreadPool.MaxSize=30
+
 # The service name of the State replicator to use
 SIPSessionGateway.SIP.StateReplicatorService=default
 
diff --git a/src/Component.cpp b/src/Component.cpp
index c38816d..3a5c6ca 100644
--- a/src/Component.cpp
+++ b/src/Component.cpp
@@ -385,6 +385,7 @@ void Component::registerPJSIPModules()
         boost::static_pointer_cast<SIPReplicationContext>(getReplicationContext());
 
     Ice::PropertiesPtr props = getCommunicator()->getProperties();
+
     Ice::StringSeq moduleNames = props->getPropertyAsList(getName() + ".SIP.Modules");
     for (Ice::StringSeq::iterator i = moduleNames.begin();
          i != moduleNames.end();
@@ -399,7 +400,11 @@ void Component::registerPJSIPModules()
         //
         if ((*i) == "Session")
         {
-            mPJSIPManager->registerSessionModule(mEndpointFactory,
+            Ice::PropertyDict sessionModuleProps = props->getPropertiesForPrefix(getName() + ".SIP.Modules.Session");
+
+            mPJSIPManager->registerSessionModule(
+                sessionModuleProps,
+                mEndpointFactory,
                 mSessionRouter,
                 getServiceLocator(),
                 sipReplicationContext,
diff --git a/src/PJSIPManager.cpp b/src/PJSIPManager.cpp
index 10fa36b..9a0f31f 100644
--- a/src/PJSIPManager.cpp
+++ b/src/PJSIPManager.cpp
@@ -128,7 +128,9 @@ PJSIPManager::~PJSIPManager()
 }
 
 
-void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+void PJSIPManager::registerSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<
             AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -137,7 +139,7 @@ void PJSIPManager::registerSessionModule(const boost::shared_ptr<SIPEndpointFact
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorManagementPrx& serviceLocatorManagement
     )
 {
-    mSessionModule = new PJSIPSessionModule(mEndpoint, endpointFactoryPtr, sessionRouter,
+    mSessionModule = new PJSIPSessionModule(sessionModuleProps, mEndpoint, endpointFactoryPtr, sessionRouter,
         serviceLocator, replicationContext, adapter, serviceLocatorManagement);
 }
 
diff --git a/src/PJSIPManager.h b/src/PJSIPManager.h
index e5eecef..559f4ce 100644
--- a/src/PJSIPManager.h
+++ b/src/PJSIPManager.h
@@ -95,7 +95,9 @@ public:
      * Register the PJSIPSessionModule, responsible
      * for basic call handling
      */
-    void registerSessionModule(const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+    void registerSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
             sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModule.cpp b/src/PJSIPSessionModule.cpp
index bc009ae..07ac87c 100644
--- a/src/PJSIPSessionModule.cpp
+++ b/src/PJSIPSessionModule.cpp
@@ -29,6 +29,7 @@
 #include <boost/lexical_cast.hpp>
 #include <boost/algorithm/string.hpp>
 #include <boost/numeric/conversion/cast.hpp>
+#include <algorithm>
 
 #include <AsteriskSCF/Core/Endpoint/EndpointIf.h>
 #include <AsteriskSCF/Core/Routing/RoutingIf.h>
@@ -2352,40 +2353,73 @@ void PJSIPSessionModule::enqueueSessionWork(const SuspendableWorkPtr& work, pjsi
     }
 }
 
-PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener()
-    : mActiveThreads(0), mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")) { }
+PJSIPSessionModuleThreadPoolListener::PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize)
+    : mActiveThreads(0), 
+      mPJLIBHook(new AsteriskSCF::PJLIB::ThreadHook("SIP Session Module Pool")), 
+      mDefaultPoolSize(defaultPoolSize),
+      mMaxPoolSize(maxPoolSize) 
+{ 
+}
+
+void PJSIPSessionModuleThreadPoolListener::initialized(const PoolPtr& pool)
+{
+    pool->setSize(mDefaultPoolSize);
+    mActiveThreads = mDefaultPoolSize;
+}
 
 void PJSIPSessionModuleThreadPoolListener::stateChanged(const PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long)
 {
     //XXX Making this behavior more customizable would be nice
     //
     //For now, what we do is kill all idle threads.
-    if (idle > 0)
+
+    int poolSize = active + idle;
+    int newPoolSize = poolSize; 
+
+    // Should we consider downsizing the pool?
+    if (poolSize > mDefaultPoolSize)
     {
-        pool->setSize((int) active);
+        // Are there any idle threads to shut down?
+        if (idle > 0)
+        {
+           newPoolSize = std::max(mDefaultPoolSize, active);
+           if (newPoolSize != poolSize)
+           {
+               mActiveThreads = newPoolSize;
+               pool->setSize(newPoolSize);
+           }
+        }
     }
-    mActiveThreads = active;
 }
 
-void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, bool)
+void PJSIPSessionModuleThreadPoolListener::queueWorkAdded(const PoolPtr& pool, Ice::Long numNewWork, Ice::Long newQueueSize)
 {
     //XXX Making this behavior more customizable would be nice
-    //
-    //For now, use one thread per work item.
 
-    lg(Debug) << "Detected the addition of work to SIP's thread pool. Setting the size to " << mActiveThreads + numNewWork;
-    int newSize = (int) (mActiveThreads + numNewWork);
-    pool->setSize(newSize);
+    if (mActiveThreads < mMaxPoolSize)
+    {
+        if (newQueueSize > mDefaultPoolSize && newQueueSize < mActiveThreads)
+        {
+            int newSize = std::min(mMaxPoolSize, newQueueSize);
+            lg(Debug) << "SIP's thread pool queue size exceeds default threads. Setting pool size to " << newSize;
+            mActiveThreads = newSize;
+            pool->setSize(newSize);
+        }
+    }
 }
 
 void PJSIPSessionModuleThreadPoolListener::queueEmptied(const PoolPtr& pool)
 {
     //XXX Making this behavior more customizable would be nice
     //
-    //For now, kill off everything
 
-    lg(Debug) << "The queue is empty so we're killing all the threads";
-    pool->setSize(0);
+    if (mActiveThreads > mDefaultPoolSize)
+    {
+        lg(Debug) << "The queue is empty so we're dropping thread pool back to default size.";
+
+        mActiveThreads = mDefaultPoolSize;
+        pool->setSize(mDefaultPoolSize);
+    }
 }
 
 void PJSIPSessionModuleThreadPoolListener::threadStart()
@@ -2419,12 +2453,14 @@ SessionWork::SessionWork(const QueuePtr& queue)
     : mThreadPoolQueue(queue),
     mInternalQueue(new SuspendableWorkQueue(this)) { }
 
-void SessionWork::workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+void SessionWork::workAdded(const QueueBasePtr&, Ice::Long numWorkAdded, Ice::Long newQueueSize)
 {
-    if (wasEmpty)
+    // Was this WorkQueue empty?
+    if (numWorkAdded == newQueueSize)
     {
         try
         {
+            // We need to be put back into the Pool's queue to be assigned to a worker thread. 
             mThreadPoolQueue->enqueueWork(this);
         }
         catch (const ShuttingDown&)
diff --git a/src/PJSIPSessionModule.h b/src/PJSIPSessionModule.h
index d08b475..1688609 100644
--- a/src/PJSIPSessionModule.h
+++ b/src/PJSIPSessionModule.h
@@ -81,15 +81,18 @@ private:
 class PJSIPSessionModuleThreadPoolListener : public AsteriskSCF::System::ThreadPool::V1::PoolListener
 {
 public:
-    PJSIPSessionModuleThreadPoolListener();
+    PJSIPSessionModuleThreadPoolListener(Ice::Long defaultPoolSize, Ice::Long maxPoolSize);
+    void initialized(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
     void stateChanged(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long active, Ice::Long idle, Ice::Long zombie);
-    void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, bool wasEmpty);
+    void queueWorkAdded(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool, Ice::Long count, Ice::Long newQueueSize);
     void queueEmptied(const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
     void threadStart();
     void threadStop();
 private:
     Ice::Long mActiveThreads;
     AsteriskSCF::PJLIB::ThreadHook *mPJLIBHook;
+    Ice::Long mDefaultPoolSize;
+    Ice::Long mMaxPoolSize;
 };
 
 typedef IceUtil::Handle<PJSIPSessionModuleThreadPoolListener> PJSIPSessionModuleThreadPoolListenerPtr;
@@ -127,7 +130,10 @@ typedef std::vector<ReasonId> ReasonIdSeq;
 class PJSIPSessionModule : public PJSIPModule
 {
 public:
-    PJSIPSessionModule(pjsip_endpoint *endpt, const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
+    PJSIPSessionModule(
+        const Ice::PropertyDict& sessionModuleProps,
+        pjsip_endpoint *endpt, 
+        const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
         const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>&
             sessionRouter,
         const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
diff --git a/src/PJSIPSessionModuleConstruction.cpp b/src/PJSIPSessionModuleConstruction.cpp
index f5fe146..c627778 100644
--- a/src/PJSIPSessionModuleConstruction.cpp
+++ b/src/PJSIPSessionModuleConstruction.cpp
@@ -14,6 +14,8 @@
  * at the top of the source tree.
  */
 
+#include <boost/algorithm/string/predicate.hpp>
+
 #include <AsteriskSCF/Logger.h>
 #include <AsteriskSCF/WorkQueue/WorkQueue.h>
 #include <AsteriskSCF/ThreadPool/ThreadPool.h>
@@ -130,7 +132,25 @@ static void invOnSendReinviteResponse(pjsip_inv_session *inv, pjsip_tx_data *tda
     return sessionModule->invOnSendReinviteResponse(inv, tdata);
 }
 
-PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
+int getIntValue(const std::string& propertyName, const std::string& propertyValue, int defaultValue)
+{
+    int value = defaultValue;
+    try
+    {
+        value = boost::lexical_cast<int>(propertyValue);
+    }
+    catch(boost::bad_lexical_cast const&)
+    {
+        lg(Error) << "Invalid value for " << propertyName << " specified. Using default of " << defaultValue;
+        return defaultValue;
+    }
+
+    return value;
+}
+
+PJSIPSessionModule::PJSIPSessionModule(
+    const Ice::PropertyDict& sessionModuleProps,
+    pjsip_endpoint *endpt,
     const boost::shared_ptr<SIPEndpointFactory>& endpointFactoryPtr,
     const AsteriskSCF::Discovery::SmartProxy<AsteriskSCF::SessionCommunications::V1::SessionRouterPrx>& sessionRouter,
     const AsteriskSCF::Core::Discovery::V1::ServiceLocatorPrx& serviceLocator,
@@ -142,6 +162,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
       mReplicationContext(replicationContext), mEndpoint(endpt),
       mSessionCreationExtensionPoint(new SIPSessionCreationExtensionPoint()), mAdapter(adapter)
 {
+
     sessionModule = this;
     mModule.name = pj_str(moduleName);
     mModule.priority = PJSIP_MOD_PRIORITY_APPLICATION;
@@ -163,6 +184,22 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
         mSessionCreationExtensionPointPrx, 
         SessionCreationExtensionPointId);
 
+    int defaultThreadPoolSize=10;
+    int maxThreadPoolSize=15;
+
+    Ice::PropertyDict::const_iterator prop;
+    for (prop = sessionModuleProps.begin(); prop != sessionModuleProps.end(); prop++)
+    {
+        if (boost::algorithm::ends_with((*prop).first, "ThreadPool.DefaultSize"))
+        {
+            defaultThreadPoolSize = getIntValue((*prop).first, (*prop).second, defaultThreadPoolSize);
+        }
+        else if (boost::algorithm::ends_with((*prop).first, "ThreadPool.MaxSize"))
+        {
+            maxThreadPoolSize = getIntValue((*prop).first, (*prop).second, maxThreadPoolSize);
+        }
+    }
+
     // TBD... how to access the Component's service and instance ids. 
     mSessionCreationExtensionPointService->addLocatorParams(
         AsteriskSCF::Operations::createContext(), 
@@ -170,7 +207,7 @@ PJSIPSessionModule::PJSIPSessionModule(pjsip_endpoint *endpt,
         "");
 
     mPoolQueue = new AsteriskSCF::WorkQueue::WorkQueue();
-    mPoolListener = new PJSIPSessionModuleThreadPoolListener(); 
+    mPoolListener = new PJSIPSessionModuleThreadPoolListener(defaultThreadPoolSize, maxThreadPoolSize); 
 
     ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
     mPool = factory->createPool(mPoolListener, mPoolQueue);
diff --git a/src/SIPSession.h b/src/SIPSession.h
index e4aa05a..0464b3b 100644
--- a/src/SIPSession.h
+++ b/src/SIPSession.h
@@ -77,7 +77,7 @@ public:
      * Overrides of QueueListener interface
      */
     SessionWork(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& threadPoolQueue);
-    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, bool wasEmpty);
+    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long newWork, Ice::Long newQueueSize);
     void workResumable(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void emptied(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void shuttingDown(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/sip.git



More information about the asterisk-scf-commits mailing list