[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "thread_pool_update" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Jul 27 15:06:32 CDT 2012


branch "thread_pool_update" has been updated
       via  ee188408f940769bd9860a6cb2ae61fca50ff8c6 (commit)
      from  d50bd6e62bd9a6f58fde0cd65047049ac2b09106 (commit)

Summary of changes:
 include/AsteriskSCF/ThreadPool/ThreadPool.h        |    5 +-
 .../AsteriskSCF/WorkQueue/DefaultQueueListener.h   |    2 +-
 src/ThreadPool/ThreadPool.cpp                      |   65 +++++++++++++++----
 src/WorkQueue/DefaultQueueListener.cpp             |    4 +-
 src/WorkQueue/SuspendableWorkQueue.cpp             |   12 ++--
 src/WorkQueue/WorkQueue.cpp                        |    5 +-
 test/ThreadPool/TestThreadPool.cpp                 |   13 +++-
 test/WorkQueue/TestSuspendableWorkQueue.cpp        |    4 +-
 test/WorkQueue/TestWorkQueue.cpp                   |    4 +-
 9 files changed, 80 insertions(+), 34 deletions(-)


- Log -----------------------------------------------------------------
commit ee188408f940769bd9860a6cb2ae61fca50ff8c6
Author: Ken Hunt <ken.hunt at digium.com>
Date:   Fri Jul 27 15:02:01 2012 -0500

    Thread pool updates to provide additional details to listener.

diff --git a/include/AsteriskSCF/ThreadPool/ThreadPool.h b/include/AsteriskSCF/ThreadPool/ThreadPool.h
index 93846c0..b947965 100644
--- a/include/AsteriskSCF/ThreadPool/ThreadPool.h
+++ b/include/AsteriskSCF/ThreadPool/ThreadPool.h
@@ -52,8 +52,9 @@ class ASTSCF_DLL_EXPORT ThreadPoolFactory : public AsteriskSCF::System::ThreadPo
 {
 public:
     ThreadPoolFactory();
-    AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
-            const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+    AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(
+        const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
+        const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
 private:
 };
 
diff --git a/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h b/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
index 37df9ea..b1f5fa0 100644
--- a/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
+++ b/include/AsteriskSCF/WorkQueue/DefaultQueueListener.h
@@ -48,7 +48,7 @@ public:
     DefaultQueueListener(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr& queue, const Ice::ThreadNotificationPtr& threadHook);
     ~DefaultQueueListener();
     
-    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty);
+    void workAdded(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize);
     void workResumable(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void emptied(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
     void shuttingDown(const AsteriskSCF::System::WorkQueue::V1::QueueBasePtr&);
diff --git a/src/ThreadPool/ThreadPool.cpp b/src/ThreadPool/ThreadPool.cpp
index 005bd27..f4caf13 100644
--- a/src/ThreadPool/ThreadPool.cpp
+++ b/src/ThreadPool/ThreadPool.cpp
@@ -31,7 +31,7 @@ using namespace AsteriskSCF::System::WorkQueue::V1;
 
 class ThreadQueueListener;
     
-typedef std::vector<WorkerThreadPtr> ThreadContainer;
+typedef std::deque<WorkerThreadPtr> ThreadContainer;
 typedef ThreadContainer::iterator ThreadIterator;
 
 class ThreadPool : public Pool, public WorkerThreadListener
@@ -46,6 +46,8 @@ public:
         ThreadQueueListenerFactory factory;
         tqListener = factory.createThreadQueueListener(this);
         mQueue->setListener(tqListener);
+
+        initialized();
     }
 
     ~ThreadPool()
@@ -301,7 +303,8 @@ public:
      * Queued task called when we are notified work has been added to the queue.
      *
      * When executed, we notify our listener that work has been added and we
-     * awaken idle threads.
+     * awaken as many idle threads as we can, up to the amount of work items 
+     * added.
      *
      * XXX The current method awakens all idle threads. A potential alternative
      * would be to awaken a number of idle threads equal to the number of
@@ -310,33 +313,67 @@ public:
     class WorkAdded : public Work
     {
     public:
-        WorkAdded(Ice::Long numNewWork, bool wasEmpty, ThreadPool *pool)
-            : mNewWork(numNewWork), mWasEmpty(wasEmpty), mPool(pool) { }
+        WorkAdded(Ice::Long numNewWork, Ice::Long newQueueSize, ThreadPool *pool)
+            : mNumNewWork(numNewWork), mNewQueueSize(newQueueSize), mPool(pool) { }
         
         void execute()
         {
-            mPool->mListener->queueWorkAdded(mPool, mNewWork, mWasEmpty);
-            for (ThreadIterator i = mPool->mIdleThreads.begin();
-                    i != mPool->mIdleThreads.end(); ++i)
+            mPool->mListener->queueWorkAdded(mPool, mNumNewWork, mNewQueueSize);
+
+            ThreadIterator i;
+            int count = 0;
+            for (i = mPool->mIdleThreads.begin();
+                 i != mPool->mIdleThreads.end() && count < mNumNewWork; ++i, ++count)
             {
                 mPool->mActiveThreads.push_back(*i);
                 (*i)->setState(Alive);
             }
-            mPool->mIdleThreads.erase(mPool->mIdleThreads.begin(), mPool->mIdleThreads.end());
+            mPool->mIdleThreads.erase(mPool->mIdleThreads.begin(), i);
         }
     private:
-        const Ice::Long mNewWork;
-        const bool mWasEmpty;
+        const Ice::Long mNumNewWork;
+        const Ice::Long mNewQueueSize;
         ThreadPool *mPool;
     };
 
 
-    void handleWorkAdded(Ice::Long numNewWork, bool wasEmpty)
+    void handleWorkAdded(Ice::Long numNewWork, Ice::Long newQueueSize)
+    {
+        boost::lock_guard<boost::mutex> lock(mQueueLock);
+        if (!mShuttingDown)
+        {
+            mControlQueue->enqueueWork(new WorkAdded(numNewWork, newQueueSize, this));
+        }
+    }
+
+    /**
+     * Queued task called when the ThreadPool is initialized.
+     *
+     * When executed, we notify our listener that the queue is fully initialized.
+     * By default there are no threads, so this allows the listener to specify
+     * a pool size. 
+     *
+     */
+    class Initialized : public Work
+    {
+    public:
+        Initialized(ThreadPool *pool)
+            : mPool(pool) { }
+        
+        void execute()
+        {
+            mPool->mListener->initialized(mPool);
+        }
+    private:
+        ThreadPool *mPool;
+    };
+
+    void initialized()
     {
         boost::lock_guard<boost::mutex> lock(mQueueLock);
         if (!mShuttingDown)
         {
-            mControlQueue->enqueueWork(new WorkAdded(numNewWork, wasEmpty, this));
+            mControlQueue->enqueueWork(new Initialized(this));
         }
     }
 
@@ -419,9 +456,9 @@ public:
     /**
      * Results in PoolListener::queueWorkAdded being called
      */
-    void workAdded(const QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty)
+    void workAdded(const QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize)
     {
-        mThreadPool->handleWorkAdded(numNewWork, wasEmpty);
+        mThreadPool->handleWorkAdded(numNewWork, newQueueSize);
     }
 
     /**
diff --git a/src/WorkQueue/DefaultQueueListener.cpp b/src/WorkQueue/DefaultQueueListener.cpp
index 4d46860..2376274 100644
--- a/src/WorkQueue/DefaultQueueListener.cpp
+++ b/src/WorkQueue/DefaultQueueListener.cpp
@@ -108,9 +108,9 @@ DefaultQueueListener::~DefaultQueueListener()
     }
 }
 
-void DefaultQueueListener::workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+void DefaultQueueListener::workAdded(const QueueBasePtr&, Ice::Long numWorkAdded, Ice::Long newQueueSize)
 {
-    if (wasEmpty)
+    if (numWorkAdded == newQueueSize) 
     {
         mPriv->setDead(false);
     }
diff --git a/src/WorkQueue/SuspendableWorkQueue.cpp b/src/WorkQueue/SuspendableWorkQueue.cpp
index 0419bb3..6939e36 100644
--- a/src/WorkQueue/SuspendableWorkQueue.cpp
+++ b/src/WorkQueue/SuspendableWorkQueue.cpp
@@ -243,39 +243,39 @@ SuspendableWorkQueue::SuspendableWorkQueue(const QueueListenerPtr& listener)
 
 void SuspendableWorkQueue::enqueueWork(const SuspendableWorkPtr& work)
 {
-    bool wasEmpty;
+    Ice::Long newSize = 0;
     QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         mPriv->checkForShuttingDown();
         //Call private version so we don't double grab the lock
-        wasEmpty = mPriv->getSize() == 0; 
+        newSize = mPriv->getSize() + 1; 
         mPriv->mQueue.push_back(work);
         listenerRef = mPriv->mListener;
     }
 
     if (listenerRef != 0)
     {
-        listenerRef->workAdded(this, 1, wasEmpty);
+        listenerRef->workAdded(this, 1, newSize);
     }
 }
 
 void SuspendableWorkQueue::enqueueWorkSeq(const SuspendableWorkSeq& works)
 {
-    bool wasEmpty;
+    int newSize = 0;
     QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         mPriv->checkForShuttingDown();
         //Call private version so we don't double grab the lock
-        wasEmpty = mPriv->getSize() == 0;
+        newSize = mPriv->getSize() + works.size();
         mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
         listenerRef = mPriv->mListener;
     }
 
     if (listenerRef != 0)
     {
-        listenerRef->workAdded(this, works.size(), wasEmpty);
+        listenerRef->workAdded(this, works.size(), newSize);
     }
 }
 
diff --git a/src/WorkQueue/WorkQueue.cpp b/src/WorkQueue/WorkQueue.cpp
index 34a022e..0e3eca8 100644
--- a/src/WorkQueue/WorkQueue.cpp
+++ b/src/WorkQueue/WorkQueue.cpp
@@ -102,18 +102,19 @@ void WorkQueue::enqueueWork(const WorkPtr& work)
 void WorkQueue::enqueueWorkSeq(const WorkSeq& works)
 {
     bool wasEmpty;
+    int newSize=0;
     QueueListenerPtr listenerRef;
     {
         boost::unique_lock<boost::shared_mutex> lock(mPriv->mLock);
         mPriv->checkForShuttingDown();
-        wasEmpty = mPriv->mQueue.empty();
         listenerRef = mPriv->mListener;
         mPriv->mQueue.insert(mPriv->mQueue.end(), works.begin(), works.end());
+        newSize = mPriv->mQueue.size();
     }
 
     if (listenerRef != 0)
     {
-        listenerRef->workAdded(this, static_cast<long>(works.size()), wasEmpty);
+        listenerRef->workAdded(this, static_cast<long>(works.size()), newSize);
     }
 }
 
diff --git a/test/ThreadPool/TestThreadPool.cpp b/test/ThreadPool/TestThreadPool.cpp
index 5f77851..e2356a8 100644
--- a/test/ThreadPool/TestThreadPool.cpp
+++ b/test/ThreadPool/TestThreadPool.cpp
@@ -31,7 +31,7 @@ class TestListener : public PoolListener
 {
 public:
     TestListener() : mActive(0), mIdle(0), mZombie(0), mTasks(0), 
-        mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false) { }
+        mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false), mInitialized(false) { }
     
     void stateChanged(const PoolPtr&, Ice::Long active, Ice::Long idle, Ice::Long zombie)
     {
@@ -42,11 +42,11 @@ public:
         mDone.notify_one();
     }
 
-    void queueWorkAdded(const PoolPtr&, Ice::Long count, bool wasEmpty)
+    void queueWorkAdded(const PoolPtr&, Ice::Long count, Ice::Long newQueueSize)
     {
         boost::lock_guard<boost::mutex> lock(mLock);
         mTasks = count;
-        mWasEmpty = wasEmpty;
+        mWasEmpty = (newQueueSize == count);
         mWorkAddedNotice = true;
         mDone.notify_one();
     }
@@ -57,6 +57,11 @@ public:
         mEmptyNotice = true;
     }
 
+    void initialized(const PoolPtr&)
+    {
+        mInitialized = true;
+    }
+
     void threadStart()
     {
     }
@@ -73,6 +78,7 @@ public:
     bool mWorkAddedNotice;
     bool mWasEmpty;
     bool mEmptyNotice;
+    bool mInitialized;
 
     boost::mutex mLock;
     boost::condition_variable mDone;
@@ -187,6 +193,7 @@ BOOST_AUTO_TEST_CASE(addWork)
 
     waitForWorkNotice(listener);
 
+    BOOST_CHECK(listener->mInitialized == true);
     BOOST_CHECK(listener->mWorkAddedNotice == true);
     BOOST_CHECK(listener->mWasEmpty == true);
     BOOST_CHECK(listener->mTasks == 1);
diff --git a/test/WorkQueue/TestSuspendableWorkQueue.cpp b/test/WorkQueue/TestSuspendableWorkQueue.cpp
index 7e6d6a0..ed75dab 100644
--- a/test/WorkQueue/TestSuspendableWorkQueue.cpp
+++ b/test/WorkQueue/TestSuspendableWorkQueue.cpp
@@ -32,10 +32,10 @@ public:
         resumableNotice(false),
         shutdownNotice(false) { }
 
-    void workAdded(const QueueBasePtr&, Ice::Long, bool wasEmpty)
+    void workAdded(const QueueBasePtr&, Ice::Long added, Ice::Long newSize)
     {
         addedNotice = true;
-        addedEmptyNotice = wasEmpty;
+        addedEmptyNotice = (newSize == added);
     }
 
     void emptied(const QueueBasePtr&)
diff --git a/test/WorkQueue/TestWorkQueue.cpp b/test/WorkQueue/TestWorkQueue.cpp
index 99dd979..dc842a8 100644
--- a/test/WorkQueue/TestWorkQueue.cpp
+++ b/test/WorkQueue/TestWorkQueue.cpp
@@ -32,10 +32,10 @@ public:
         addedEmptyNotice(false),
         emptyNotice(false), shutdownNotice(false) { }
 
-    void workAdded(const QueueBasePtr&, Ice::Long numNewWork, bool wasEmpty)
+    void workAdded(const QueueBasePtr&, Ice::Long numNewWork, Ice::Long newQueueSize)
     {
         addedNotice = true;
-        addedEmptyNotice = wasEmpty;
+        addedEmptyNotice = (newQueueSize == numNewWork);
         tasksAdded = numNewWork;
     }
 

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list