[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Mon Apr 18 16:04:15 CDT 2011


branch "workqueue" has been updated
       via  124bbd1334c58303bfacd7b420473deb80adcd37 (commit)
       via  5b2ad8956a46f4f85a6e461f3434e26ac279f7db (commit)
       via  2f35c1590b2a69219ebe9c11e8648e5e2ec08152 (commit)
       via  56e6236802c7329662980ba9909ccc09babdaa78 (commit)
       via  0e2d0969783f84856ba601129dd8d28bae5af8a9 (commit)
       via  8b0f7cf8492427b05847467a0495b6ae3412a995 (commit)
       via  7f49137d87f5d39ba167e28645d5999fb8ff824f (commit)
       via  c88fc99c23a531e3c32cc244188d4c59c0cac582 (commit)
       via  3b857b2c97fc71a498dd7858a277509fe26b970a (commit)
      from  e62d3ce2544ac6f072e0be683656f486eed41252 (commit)

Summary of changes:
 ThreadPool/include/AsteriskSCF/ThreadPool.h        |   29 ++-
 ThreadPool/include/AsteriskSCF/WorkerThread.h      |   35 ++-
 ThreadPool/src/ThreadPool.cpp                      |  314 ++++++++++++--------
 ThreadPool/test/TestThreadPool.cpp                 |  299 +++++--------------
 .../include/AsteriskSCF/DefaultQueueListener.h     |   14 +
 WorkQueue/src/SuspendableWorkQueue.cpp             |    8 +
 WorkQueue/src/WorkQueue.cpp                        |    8 +
 7 files changed, 347 insertions(+), 360 deletions(-)


- Log -----------------------------------------------------------------
commit 124bbd1334c58303bfacd7b420473deb80adcd37
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 16:03:16 2011 -0500

    Explicitly destroy the SuspendableWorkQueue's listener in its destructor.

diff --git a/WorkQueue/src/SuspendableWorkQueue.cpp b/WorkQueue/src/SuspendableWorkQueue.cpp
index 839cc1a..9893212 100644
--- a/WorkQueue/src/SuspendableWorkQueue.cpp
+++ b/WorkQueue/src/SuspendableWorkQueue.cpp
@@ -35,6 +35,14 @@ public:
     SuspendableWorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener), state(Ready), currentWork(0) { }
 
+    ~SuspendableWorkQueuePriv()
+    {
+        // We explicitly destroy the listener to ensure
+        // that it gets destroyed before the lock, which
+        // may be in use at the moment.
+        mListener = 0;
+    }
+
     /**
      * mLock is expected to be held when this function is called
      */

commit 5b2ad8956a46f4f85a6e461f3434e26ac279f7db
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 15:58:45 2011 -0500

    Add an explanation to DefaultQueueListener.

diff --git a/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
index 9557b05..a7fb39b 100644
--- a/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
+++ b/WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
@@ -25,6 +25,20 @@ namespace WorkQueue
 
 class DefaultQueueListenerPriv;
 
+/**
+ * A sample implementation of a QueueListener
+ *
+ * This implementation creates a thread that
+ * executes work on a Queue if there is work
+ * to be done. If the Queue has no work to be
+ * done, then the thread goes idle until new
+ * work is added.
+ *
+ * Since this will likely be a common way of
+ * implementing QueueListeners, this one is
+ * provided as part of the WorkQueue library
+ * as a matter of convenience.
+ */
 class DefaultQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
 {
 public:

commit 2f35c1590b2a69219ebe9c11e8648e5e2ec08152
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 15:53:56 2011 -0500

    Several changes for the sake of cleanup
    
    * Use pImpl idiom on WorkerThreads
    * Add comments all over explaining things
    * Use some typedefs to make things both easier to read
      and easier to change if the type of container should
      be changed.

diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index 9a1e662..0d38c76 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -27,6 +27,8 @@ namespace ThreadPool
 
 class ThreadPoolPriv;
 
+// For more information on these methods, see
+// AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
 class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool
 {
 public:
@@ -37,19 +39,42 @@ private:
     boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
 };
 
+/**
+ * An implementation of a QueueListener used by a ThreadPool.
+ *
+ * The main job of this listener is to redirect the events to the
+ * Pool's PoolListener.
+ *
+ * For mor information on the QueueListener's methods, see
+ * AsteriskSCF/System/WorkQueue/WorkQueueIf.ice
+ */
 class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
 {
 public:
-    ThreadQueueListener(ThreadPoolPriv *something,
+    ThreadQueueListener(ThreadPoolPriv *Priv,
             AsteriskSCF::System::ThreadPool::V1::Pool *pool);
+    /**
+     * Results in PoolListener::queueWorkAdded being called
+     */
     void workAdded(int numNewWork, bool wasEmpty);
+
+    /**
+     * Should never be called since a ThreadPool does not
+     * use a SuspendableQueue
+     */
     void workResumable();
+
+    /**
+     * Results in PoolListener::queueEmptied being called
+     */
     void emptied();
 private:
     ThreadPoolPriv *mThreadPoolPriv;
     AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
 };
 
+// For more information on these methods, see
+// AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
 class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
 {
 public:
@@ -59,5 +84,7 @@ public:
 private:
 };
 
+typedef IceUtil::Handle<ThreadPoolFactory> ThreadPoolFactoryPtr;
+
 }; // end namespace ThreadPool
 }; // end namespace AsteriskSCF
diff --git a/ThreadPool/include/AsteriskSCF/WorkerThread.h b/ThreadPool/include/AsteriskSCF/WorkerThread.h
index 187c616..741be3a 100644
--- a/ThreadPool/include/AsteriskSCF/WorkerThread.h
+++ b/ThreadPool/include/AsteriskSCF/WorkerThread.h
@@ -39,47 +39,52 @@ enum ThreadState
      * Marked for deletion. May still be executing
      * code but next time it checks its state, it
      * will die.
-     *
-     * Basically what happens when we try to kill
-     * a thread when it is Active
      */
     Zombie,
     /**
      * The ThreadPoolImpl considers this thread to
      * be gone. The thread just needs to get out of
      * the way ASAP.
-     *
-     * Basically what happens when we try to kill
-     * a thread when it is Idle
      */
     Dead
 };
 
 class WorkerThreadListener;
 
+class WorkerThreadPriv;
+
 class WorkerThread
 {
 public:
     WorkerThread(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue, WorkerThreadListener *listener);
+    /**
+     * Tells a worker thread to change states
+     *
+     * @param newState The state to change to
+     */
     void poke(ThreadState newState);
+    /**
+     * Wait for the worker thread to complete
+     */
     void join();
 
 private:
-    void active();
-    void idle();
-
-    ThreadState mState;
-    WorkerThreadListener *mListener;
-    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
-    boost::condition_variable mCond;
-    boost::mutex mLock;
-    boost::thread mThread;
+    boost::shared_ptr<WorkerThreadPriv> mPriv;
 };
 
+/**
+ * An listener for WorkerThread state changes
+ */
 class WorkerThreadListener
 {
 public:
+    /**
+     * An active thread has become idle
+     */
     virtual void activeThreadIdle(WorkerThread *thread) = 0;
+    /**
+     * A zombie thread has died
+     */
     virtual void zombieThreadDead(WorkerThread *thread) = 0;
 };
 
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 07ee101..cc8542f 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -29,76 +29,97 @@ namespace ThreadPool
 using namespace AsteriskSCF::System::ThreadPool::V1;
 using namespace AsteriskSCF::System::WorkQueue::V1;
 
-WorkerThread::WorkerThread(const QueuePtr& workQueue, WorkerThreadListener *listener)
-    : mState(Active), mListener(listener), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
-
-void WorkerThread::active()
+class WorkerThreadPriv
 {
-    while (mState == Active)
+public:
+    WorkerThreadPriv(const QueuePtr& workQueue, WorkerThreadListener *listener,
+            WorkerThread *workerThread)
+        : mState(Active), mListener(listener), mQueue(workQueue), mWorkerThread(workerThread),
+        mThread(boost::bind(&WorkerThreadPriv::active, this)) { }
+
+    void active()
     {
-        if (!mQueue->executeWork())
+        while (mState == Active)
         {
-            idle();
+            if (!mQueue->executeWork())
+            {
+                idle();
+            }
         }
-    }
 
-    // Reaching this portion means the thread is
-    // on death's door. It may have been killed while
-    // it was idle, in which case it can just die
-    // peacefully. If it's a zombie, though, then
-    // it needs to let the ThreadPoolImpl know so
-    // that the thread can be removed from the
-    // vector of zombie threads.
-    if (mState == Zombie)
-    {
-        mListener->zombieThreadDead(this);
+        // Reaching this portion means the thread is
+        // on death's door. It may have been killed while
+        // it was idle, in which case it can just die
+        // peacefully. If it's a zombie, though, then
+        // it needs to let the ThreadPoolImpl know so
+        // that the thread can be removed from the
+        // vector of zombie threads.
+        if (mState == Zombie)
+        {
+            mListener->zombieThreadDead(mWorkerThread);
+        }
     }
-}
 
-void WorkerThread::idle()
-{
+    void idle()
     {
-        boost::unique_lock<boost::mutex> lock(mLock);
-        // If we've been turned into a zombie while we
-        // were active, then just go ahead and return.
-        if (mState == Zombie)
         {
-            return;
-        }
+            boost::unique_lock<boost::mutex> lock(mLock);
+            // If we've been turned into a zombie while we
+            // were active, then just go ahead and return.
+            if (mState == Zombie)
+            {
+                return;
+            }
 
-        // Otherwise, we'll set ourselves idle and wait
-        // for a poke
-        mState = Idle;
-    }
+            // Otherwise, we'll set ourselves idle and wait
+            // for a poke
+            mState = Idle;
+        }
 
-    mListener->activeThreadIdle(this);
+        mListener->activeThreadIdle(mWorkerThread);
 
-    {
-        boost::unique_lock<boost::mutex> lock(mLock);
-        while (mState == Idle)
         {
-            mCond.wait(lock);
+            boost::unique_lock<boost::mutex> lock(mLock);
+            while (mState == Idle)
+            {
+                mCond.wait(lock);
+            }
         }
     }
-}
+
+    ThreadState mState;
+    WorkerThreadListener *mListener;
+    QueuePtr mQueue;
+    WorkerThread *mWorkerThread;
+    boost::condition_variable mCond;
+    boost::mutex mLock;
+    boost::thread mThread;
+};
+
+WorkerThread::WorkerThread(const QueuePtr& workQueue, WorkerThreadListener *listener)
+    : mPriv(new WorkerThreadPriv(workQueue, listener, this)) { }
 
 void WorkerThread::poke(ThreadState newState)
 {
-    boost::unique_lock<boost::mutex> lock(mLock);
-    mState = newState;
-    mCond.notify_one();
+    boost::unique_lock<boost::mutex> lock(mPriv->mLock);
+    mPriv->mState = newState;
+    mPriv->mCond.notify_one();
 }
 
 void WorkerThread::join()
 {
-    mThread.join();
+    mPriv->mThread.join();
 }
 
 class ThreadQueueListener;
+    
+typedef std::vector<WorkerThread*> ThreadContainer;
+typedef ThreadContainer::iterator ThreadIterator;
 
 class ThreadPoolPriv : public WorkerThreadListener
 {
 public:
+
     ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
         : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mPool(pool), mShuttingDown(false)
     {
@@ -115,7 +136,6 @@ public:
         //First, we make it known we are shutting down, and kill off
         //the control queue so that no more operations that might
         //move WorkerThreads from their current containers may run.
-
         {
             boost::lock_guard<boost::mutex> lock(mQueueLock);
             mShuttingDown = true;
@@ -123,9 +143,7 @@ public:
         }
 
         //Now WorkerThreads are stuck in their current containers. All
-        //we need to do now is to kill them off. Poking them into the
-        //Dead state, joining, and then deleting will do the trick.
-
+        //we need to do now is to kill them off.
         killThreads(mIdleThreads, mIdleThreads.begin(), mIdleThreads.end());
         killThreads(mActiveThreads, mActiveThreads.begin(), mActiveThreads.end());
         killThreads(mZombieThreads, mZombieThreads.begin(), mZombieThreads.end());
@@ -139,10 +157,10 @@ public:
         mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
 
-    void killThreads(std::vector<WorkerThread*>& container,
-            std::vector<WorkerThread*>::iterator first, std::vector<WorkerThread*>::iterator last)
+    void killThreads(ThreadContainer& container,
+            ThreadIterator first, ThreadIterator last)
     {
-        for (std::vector<WorkerThread*>::iterator iter = first; iter != last; ++iter)
+        for (ThreadIterator iter = first; iter != last; ++iter)
         {
             WorkerThread *doomed = *iter;
             doomed->poke(Dead);
@@ -152,9 +170,9 @@ public:
         container.erase(first, last);
     }
 
-    void zombifyActiveThreads(std::vector<WorkerThread*>::iterator first, std::vector<WorkerThread*>::iterator last)
+    void zombifyActiveThreads(ThreadIterator first, ThreadIterator last)
     {
-        for (std::vector<WorkerThread*>::iterator iter = first; iter != last; ++iter)
+        for (ThreadIterator iter = first; iter != last; ++iter)
         {
             mZombieThreads.push_back(*iter);
             (*iter)->poke(Zombie);
@@ -162,6 +180,13 @@ public:
         mActiveThreads.erase(first, last);
     }
 
+    /**
+     * Queued task that moves an active thread to the idle thread container.
+     *
+     * This method is queued on the control queue via the
+     * activeThreadIdle() method and is always enqueued by
+     * worker threads.
+     */
     class ActiveThreadIdle : public Work
     {
     public:
@@ -170,7 +195,7 @@ public:
 
         void execute()
         {
-            std::vector<WorkerThread*>::iterator iter =
+            ThreadIterator iter =
                 std::find(mPriv->mActiveThreads.begin(),
                         mPriv->mActiveThreads.end(), mWorkerThread);
     
@@ -198,6 +223,13 @@ public:
         }
     }
 
+    /**
+     * Queued task that deletes and removes a zombie thread from the zombie thread container.
+     *
+     * This method is queued on the control queue via the
+     * zombieThreadDead() method and is always enqueued
+     * by worker threads.
+     */
     class ZombieThreadDead : public Work
     {
     public:
@@ -206,9 +238,8 @@ public:
 
         void execute()
         {
-            std::vector<WorkerThread*>::iterator i =
-                std::find(mPriv->mZombieThreads.begin(),
-                        mPriv->mZombieThreads.end(), mWorkerThread);
+            ThreadIterator i = std::find(mPriv->mZombieThreads.begin(),
+                    mPriv->mZombieThreads.end(), mWorkerThread);
             if (i != mPriv->mZombieThreads.end())
             {
                 mPriv->killThreads(mPriv->mZombieThreads, i, i + 1);
@@ -232,6 +263,13 @@ public:
         }
     }
 
+    /**
+     * Queued task that causes the thread pool to gain or lose threads
+     *
+     * This is enqueued as a result of calling the resize() method,
+     * which is invoked by ThreadPool::setSize(). This may be called
+     * from any thread.
+     */
     class Resize : public Work
     {
     public:
@@ -258,6 +296,13 @@ public:
             mPriv->sendStateChanged();
         }
     private:
+        /**
+         * Add more threads to the pool.
+         *
+         * All new threads start as Active, so our method here
+         * is simply to add new threads to the end of the
+         * active thread container.
+         */
         void grow(size_t numNewThreads)
         {
             for (size_t i = 0; i < numNewThreads; ++i)
@@ -267,13 +312,30 @@ public:
             }
         }
         
+        /**
+         * Remove threads from the pool.
+         *
+         * The methodology here is pretty simple. First, we eliminate
+         * idle threads since they're not doing anything and can die
+         * easily. If after killing all idle threads we still need
+         * to remove more threads, then the next thing to do is to
+         * zombify some active threads.
+         */
         void shrink(size_t threadsToKill)
         {
-            size_t idleThreadsToKill = threadsToKill <= mPriv->mIdleThreads.size() ? threadsToKill : mPriv->mIdleThreads.size();
+            size_t idleThreadsToKill = threadsToKill <= mPriv->mIdleThreads.size() ?
+                threadsToKill : mPriv->mIdleThreads.size();
             size_t activeThreadsToZombify = threadsToKill - idleThreadsToKill;
 
-            mPriv->killThreads(mPriv->mIdleThreads, mPriv->mIdleThreads.begin(), mPriv->mIdleThreads.begin() + idleThreadsToKill);
-            mPriv->zombifyActiveThreads(mPriv->mActiveThreads.begin(), mPriv->mActiveThreads.begin() + activeThreadsToZombify);
+            mPriv->killThreads(mPriv->mIdleThreads, mPriv->mIdleThreads.begin(),
+                    mPriv->mIdleThreads.begin() + idleThreadsToKill);
+
+            //This isn't really necessary since Resize::execute() checked the
+            //size prior to calling shrink. I'm just paranoid :)
+            assert(activeThreadsToZombify <= mPriv->mActiveThreads.size());
+
+            mPriv->zombifyActiveThreads(mPriv->mActiveThreads.begin(),
+                    mPriv->mActiveThreads.begin() + activeThreadsToZombify);
         }
 
         const size_t mNumThreads;
@@ -292,15 +354,36 @@ public:
         }
     }
 
+    //The control queue is where operations to modify
+    //the thread pool are queued. This allows for thread
+    //pool listeners to guarantee that the notifications
+    //they receive arrive in the order that the thread
+    //pool processed the operations that caused those
+    //notifications to be sent.
+    //
+    //This also has the benefit of eliminating locking
+    //from the majority of the thread pool.
     QueuePtr mControlQueue;
-    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
-    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
-    AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
-    
-    std::vector<WorkerThread*> mActiveThreads;
-    std::vector<WorkerThread*> mIdleThreads;
-    std::vector<WorkerThread*> mZombieThreads;
 
+    PoolListenerPtr mListener;
+    QueuePtr mQueue;
+    
+    //The ThreadPoolPriv has to keep a pointer to
+    //the public portion of the Pool since all listener
+    //methods take a pointer to the Pool as a parameter.
+    Pool *mPool;
+    
+    ThreadContainer mActiveThreads;
+    ThreadContainer mIdleThreads;
+    ThreadContainer mZombieThreads;
+
+    //These two members are required solely to make
+    //destruction of the thread pool safe. Since destruction
+    //of the thread pool is the only operation that does not
+    //go through the control queue, we have to be careful
+    //to make sure that no one attempts to make use of the
+    //control queue once we are in the process of
+    //destroying the thread pool.
     bool mShuttingDown;
     boost::mutex mQueueLock;
 };
@@ -318,7 +401,7 @@ void ThreadQueueListener::workAdded(int numNewWork, bool wasEmpty)
     //
     // A potential alternative would be to poke a number of idle threads equal to the
     // new work count.
-    for (std::vector<WorkerThread*>::iterator i = mThreadPoolPriv->mIdleThreads.begin();
+    for (ThreadIterator i = mThreadPoolPriv->mIdleThreads.begin();
             i != mThreadPoolPriv->mIdleThreads.end(); ++i)
     {
         (*i)->poke(Active);
@@ -356,5 +439,12 @@ QueuePtr ThreadPool::getQueue()
     return mThreadPoolPriv->mQueue;
 }
 
+ThreadPoolFactory::ThreadPoolFactory() { }
+
+PoolPtr ThreadPoolFactory::createPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+{
+    return new ThreadPool(listener, queue);
+}
+
 }; // end namespace ThreadPool
 }; // end namespace Asterisk SCF
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index ef790ce..7417063 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -153,7 +153,8 @@ BOOST_AUTO_TEST_CASE(addWork)
     TestListenerPtr listener(new TestListener);
     QueuePtr queue(new WorkQueue());
     SimpleTaskPtr work(new SimpleTask());
-    PoolPtr pool(new ThreadPool(listener, queue));
+    ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+    PoolPtr pool = factory->createPool(listener, queue);
 
     queue->enqueueWork(work);
 

commit 56e6236802c7329662980ba9909ccc09babdaa78
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 14:29:11 2011 -0500

    Add helper methods for killing and zombifying threads.

diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 3e05d1d..07ee101 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -125,33 +125,10 @@ public:
         //Now WorkerThreads are stuck in their current containers. All
         //we need to do now is to kill them off. Poking them into the
         //Dead state, joining, and then deleting will do the trick.
-        while (!mIdleThreads.empty())
-        {
-            std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
-            WorkerThread *doomed = *i;
-            mIdleThreads.erase(i);
-            doomed->poke(Dead);
-            doomed->join();
-            delete doomed;
-        }
-        while (!mActiveThreads.empty())
-        {
-            std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
-            WorkerThread *doomed = *i;
-            mActiveThreads.erase(i);
-            doomed->poke(Dead);
-            doomed->join();
-            delete doomed;
-        }
-        while (!mZombieThreads.empty())
-        {
-            std::vector<WorkerThread*>::iterator i = mZombieThreads.begin();
-            WorkerThread *doomed = *i;
-            mZombieThreads.erase(i);
-            doomed->poke(Dead);
-            doomed->join();
-            delete doomed;
-        }
+
+        killThreads(mIdleThreads, mIdleThreads.begin(), mIdleThreads.end());
+        killThreads(mActiveThreads, mActiveThreads.begin(), mActiveThreads.end());
+        killThreads(mZombieThreads, mZombieThreads.begin(), mZombieThreads.end());
     }
 
     void sendStateChanged()
@@ -162,6 +139,29 @@ public:
         mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
 
+    void killThreads(std::vector<WorkerThread*>& container,
+            std::vector<WorkerThread*>::iterator first, std::vector<WorkerThread*>::iterator last)
+    {
+        for (std::vector<WorkerThread*>::iterator iter = first; iter != last; ++iter)
+        {
+            WorkerThread *doomed = *iter;
+            doomed->poke(Dead);
+            doomed->join();
+            delete doomed;
+        }
+        container.erase(first, last);
+    }
+
+    void zombifyActiveThreads(std::vector<WorkerThread*>::iterator first, std::vector<WorkerThread*>::iterator last)
+    {
+        for (std::vector<WorkerThread*>::iterator iter = first; iter != last; ++iter)
+        {
+            mZombieThreads.push_back(*iter);
+            (*iter)->poke(Zombie);
+        }
+        mActiveThreads.erase(first, last);
+    }
+
     class ActiveThreadIdle : public Work
     {
     public:
@@ -211,9 +211,7 @@ public:
                         mPriv->mZombieThreads.end(), mWorkerThread);
             if (i != mPriv->mZombieThreads.end())
             {
-                WorkerThread *doomed = *i;
-                mPriv->mZombieThreads.erase(i);
-                delete doomed;
+                mPriv->killThreads(mPriv->mZombieThreads, i, i + 1);
             }
             mPriv->sendStateChanged();
         }
@@ -242,7 +240,7 @@ public:
         void execute()
         {
             //We don't count zombie threads as being "live" when potentially resizing
-            int currentSize = mPriv->mActiveThreads.size() + mPriv->mIdleThreads.size();
+            size_t currentSize = mPriv->mActiveThreads.size() + mPriv->mIdleThreads.size();
 
             if (currentSize == mNumThreads)
             {
@@ -260,50 +258,25 @@ public:
             mPriv->sendStateChanged();
         }
     private:
-        void grow(int numNewThreads)
+        void grow(size_t numNewThreads)
         {
-            for (int i = 0; i < numNewThreads; ++i)
+            for (size_t i = 0; i < numNewThreads; ++i)
             {
                 WorkerThread *newThread(new WorkerThread(mPriv->mQueue, mPriv));
                 mPriv->mActiveThreads.push_back(newThread);
             }
         }
         
-        void shrink(int threadsToKill)
+        void shrink(size_t threadsToKill)
         {
-            while (threadsToKill > 0)
-            {
-                std::vector<WorkerThread*>::iterator i = mPriv->mIdleThreads.begin();
-                if (i == mPriv->mIdleThreads.end())
-                {
-                    //We're out of idle threads to kill.
-                    break;
-                }
-                WorkerThread *doomed = *i;
-                mPriv->mIdleThreads.erase(i);
-                doomed->poke(Dead);
-                doomed->join();
-                delete doomed;
-        
-                --threadsToKill;
-            }
-        
-            while (threadsToKill > 0)
-            {
-                // If we've made it here, then it means that there weren't enough idle
-                // threads to kill. We'll need to zombify some active threads then.
-                std::vector<WorkerThread*>::iterator i = mPriv->mActiveThreads.begin();
-                if (i != mPriv->mActiveThreads.end())
-                {
-                    mPriv->mZombieThreads.push_back(*i);
-                    (*i)->poke(Zombie);
-                    mPriv->mActiveThreads.erase(i);
-                }
-                --threadsToKill;
-            }
+            size_t idleThreadsToKill = threadsToKill <= mPriv->mIdleThreads.size() ? threadsToKill : mPriv->mIdleThreads.size();
+            size_t activeThreadsToZombify = threadsToKill - idleThreadsToKill;
+
+            mPriv->killThreads(mPriv->mIdleThreads, mPriv->mIdleThreads.begin(), mPriv->mIdleThreads.begin() + idleThreadsToKill);
+            mPriv->zombifyActiveThreads(mPriv->mActiveThreads.begin(), mPriv->mActiveThreads.begin() + activeThreadsToZombify);
         }
 
-        const int mNumThreads;
+        const size_t mNumThreads;
         ThreadPoolPriv *mPriv;
     };
 

commit 0e2d0969783f84856ba601129dd8d28bae5af8a9
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 12:03:19 2011 -0500

    Add helper method for sending stateChanged notifications.

diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 9de49e3..3e05d1d 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -154,6 +154,14 @@ public:
         }
     }
 
+    void sendStateChanged()
+    {
+        int activeSize = mActiveThreads.size();
+        int idleSize = mIdleThreads.size();
+        int zombieSize = mZombieThreads.size();
+        mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
+    }
+
     class ActiveThreadIdle : public Work
     {
     public:
@@ -171,10 +179,7 @@ public:
                 mPriv->mIdleThreads.push_back(*iter);
                 mPriv->mActiveThreads.erase(iter);
             }
-            int activeSize = mPriv->mActiveThreads.size();
-            int idleSize = mPriv->mIdleThreads.size();
-            int zombieSize = mPriv->mZombieThreads.size();
-            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
+            mPriv->sendStateChanged();
         }
     private:
         ThreadPoolPriv *mPriv;
@@ -210,10 +215,7 @@ public:
                 mPriv->mZombieThreads.erase(i);
                 delete doomed;
             }
-            int activeSize = mPriv->mActiveThreads.size();
-            int idleSize = mPriv->mIdleThreads.size();
-            int zombieSize = mPriv->mZombieThreads.size();
-            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
+            mPriv->sendStateChanged();
         }
     private:
         ThreadPoolPriv *mPriv;
@@ -255,10 +257,7 @@ public:
             {
                 shrink(currentSize - mNumThreads);
             }
-            int activeSize = mPriv->mActiveThreads.size();
-            int idleSize = mPriv->mIdleThreads.size();
-            int zombieSize = mPriv->mZombieThreads.size();
-            mPriv->mListener->stateChanged(mPriv->mPool, activeSize, idleSize, zombieSize);
+            mPriv->sendStateChanged();
         }
     private:
         void grow(int numNewThreads)

commit 8b0f7cf8492427b05847467a0495b6ae3412a995
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 11:40:39 2011 -0500

    Add another helper function for waiting for task completion.

diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 6d43e96..ef790ce 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -39,7 +39,7 @@ public:
         mActive = active;
         mIdle = idle;
         mZombie = zombie;
-        mCond.notify_one();
+        mDone.notify_one();
     }
 
     void queueWorkAdded(const PoolPtr& pool, int count, bool wasEmpty)
@@ -66,7 +66,7 @@ public:
     bool mEmptyNotice;
 
     boost::mutex mLock;
-    boost::condition_variable mCond;
+    boost::condition_variable mDone;
 };
 
 typedef IceUtil::Handle<TestListener> TestListenerPtr;
@@ -79,12 +79,12 @@ public:
     {
         boost::unique_lock<boost::mutex> lock(mLock);
         taskExecuted = true;
-        mCond.notify_one();
+        mDone.notify_one();
     }
     bool taskExecuted;
 
     boost::mutex mLock;
-    boost::condition_variable mCond;
+    boost::condition_variable mDone;
 };
 
 typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
@@ -123,7 +123,7 @@ typedef IceUtil::Handle<ComplexTask> ComplexTaskPtr;
     boost::unique_lock<boost::mutex> lock(listener->mLock);\
     while ((condition))\
     {\
-        listener->mCond.wait(lock);\
+        listener->mDone.wait(lock);\
     }\
 }\
 
@@ -134,6 +134,16 @@ static void pokeWorker(ComplexTaskPtr& task)
     task->mStall.notify_one();
 }
 
+template <class T>
+static void waitForCompletion(T& task)
+{
+    boost::unique_lock<boost::mutex> lock(task->mLock);
+    while (!task->taskExecuted)
+    {
+        task->mDone.wait(lock);
+    }
+}
+
 BOOST_AUTO_TEST_SUITE(ThreadPoolTest)
 
 BOOST_AUTO_TEST_CASE(addWork)
@@ -214,13 +224,7 @@ BOOST_AUTO_TEST_CASE(oneTaskOneThread)
 
     //The thread should execute the work and then
     //become idle.
-    {
-        boost::unique_lock<boost::mutex> lock(work->mLock);
-        while (!work->taskExecuted)
-        {
-            work->mCond.wait(lock);
-        }
-    }
+    waitForCompletion(work);
 
     BOOST_CHECK(work->taskExecuted == true);
     BOOST_CHECK(listener->mEmptyNotice == true);
@@ -251,13 +255,7 @@ BOOST_AUTO_TEST_CASE(oneThreadOneTask)
     //become active and execute the work.
     queue->enqueueWork(work);
 
-    {
-        boost::unique_lock<boost::mutex> lock(work->mLock);
-        while (!work->taskExecuted)
-        {
-            work->mCond.wait(lock);
-        }
-    }
+    waitForCompletion(work);
 
     BOOST_CHECK(work->taskExecuted == true);
     BOOST_CHECK(listener->mEmptyNotice == true);
@@ -293,13 +291,7 @@ BOOST_AUTO_TEST_CASE(oneThreadMultipleTasks)
     //ensured in our queue tests that execution happens in the
     //correct order, so we just need to wait for the third task
     //to be complete.
-    {
-        boost::unique_lock<boost::mutex> lock(work3->mLock);
-        while (!work3->taskExecuted)
-        {
-            work3->mCond.wait(lock);
-        }
-    }
+    waitForCompletion(work3);
 
     BOOST_CHECK(work1->taskExecuted == true);
     BOOST_CHECK(work2->taskExecuted == true);
@@ -349,19 +341,9 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
     pokeWorker(work2);
 
     //Now be sure the tasks finish
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        while (!work1->taskExecuted)
-        {
-            work1->mDone.wait(lock1);
-        }
-        while (!work2->taskExecuted)
-        {
-            work2->mDone.wait(lock2);
-        }
-    }
-
+    waitForCompletion(work1);
+    waitForCompletion(work2);
+    
     BOOST_CHECK(work1->taskExecuted == true);
     BOOST_CHECK(work2->taskExecuted == true);
 
@@ -414,19 +396,9 @@ BOOST_AUTO_TEST_CASE(zombies)
     pokeWorker(work2);
 
     //Now be sure the tasks finish
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        while (!work1->taskExecuted)
-        {
-            work1->mDone.wait(lock1);
-        }
-        while (!work2->taskExecuted)
-        {
-            work2->mDone.wait(lock2);
-        }
-    }
-
+    waitForCompletion(work1);
+    waitForCompletion(work2);
+    
     BOOST_CHECK(work1->taskExecuted == true);
     BOOST_CHECK(work2->taskExecuted == true);
 
@@ -480,19 +452,9 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
     pokeWorker(work1);
     pokeWorker(work2);
 
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        while (!work1->taskExecuted)
-        {
-            work1->mDone.wait(lock1);
-        }
-        while (!work2->taskExecuted)
-        {
-            work2->mDone.wait(lock2);
-        }
-    }
-
+    waitForCompletion(work1);
+    waitForCompletion(work2);
+    
     BOOST_CHECK(work1->taskExecuted == true);
     BOOST_CHECK(work2->taskExecuted == true);
     

commit 7f49137d87f5d39ba167e28645d5999fb8ff824f
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 11:32:14 2011 -0500

    Explicitly destroy the QueueListener in the WorkQueuePriv's destructor.
    
    Every now and then a ThreadPool test would fail due to an apparent race
    condition. The problem was that the WorkQueuePriv's shared mutex was
    being destroyed while its listener thread was still (indirectly) using it.
    By explicitly destroying the listener in the destructor, we ensure it
    gets destroyed before the shared_mutex does.
    
    I reran the ThreadPool test about 30 times and saw no crashes. I easily
    would have seen 4 or 5 in that period prior to this change.

diff --git a/WorkQueue/src/WorkQueue.cpp b/WorkQueue/src/WorkQueue.cpp
index 736997e..e1a8d69 100644
--- a/WorkQueue/src/WorkQueue.cpp
+++ b/WorkQueue/src/WorkQueue.cpp
@@ -35,6 +35,14 @@ public:
     WorkQueuePriv(const QueueListenerPtr& listener)
         : mListener(listener) { }
 
+    ~WorkQueuePriv()
+    {
+        //The reason this is here is so that we ensure that
+        //the listener is destroyed before the lock or queue
+        //is.
+        mListener = 0;
+    }
+
     WorkPtr getNextTask()
     {
         boost::unique_lock<boost::shared_mutex> lock(mLock);

commit c88fc99c23a531e3c32cc244188d4c59c0cac582
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 11:31:40 2011 -0500

    Add helper function to poke ComplexTasks.

diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index ef1fc36..6d43e96 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -127,6 +127,13 @@ typedef IceUtil::Handle<ComplexTask> ComplexTaskPtr;
     }\
 }\
 
+static void pokeWorker(ComplexTaskPtr& task)
+{
+    boost::unique_lock<boost::mutex> lock(task->mLock);
+    task->mContinue = true;
+    task->mStall.notify_one();
+}
+
 BOOST_AUTO_TEST_SUITE(ThreadPoolTest)
 
 BOOST_AUTO_TEST_CASE(addWork)
@@ -338,14 +345,8 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
     BOOST_CHECK(listener->mZombie == 0);
 
     //Cool, so let's give those threads a poke
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        work1->mContinue = true;
-        work2->mContinue = true;
-        work1->mStall.notify_one();
-        work2->mStall.notify_one();
-    }
+    pokeWorker(work1);
+    pokeWorker(work2);
 
     //Now be sure the tasks finish
     {
@@ -409,14 +410,8 @@ BOOST_AUTO_TEST_CASE(zombies)
 
     //Now we should still be able to poke the work and
     //have it complete executing.
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        work1->mContinue = true;
-        work2->mContinue = true;
-        work1->mStall.notify_one();
-        work2->mStall.notify_one();
-    }
+    pokeWorker(work1);
+    pokeWorker(work2);
 
     //Now be sure the tasks finish
     {
@@ -482,14 +477,8 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 1);
 
-    {
-        boost::unique_lock<boost::mutex> lock1(work1->mLock);
-        boost::unique_lock<boost::mutex> lock2(work2->mLock);
-        work1->mContinue = true;
-        work2->mContinue = true;
-        work1->mStall.notify_one();
-        work2->mStall.notify_one();
-    }
+    pokeWorker(work1);
+    pokeWorker(work2);
 
     {
         boost::unique_lock<boost::mutex> lock1(work1->mLock);

commit 3b857b2c97fc71a498dd7858a277509fe26b970a
Author: Mark Michelson <mmichelson at digium.com>
Date:   Mon Apr 18 09:39:50 2011 -0500

    Reduce size of the thread pool test by introducing a handy macro to wait for threads to reach a specific state.

diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 5ff1920..ef1fc36 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -96,7 +96,7 @@ public:
 
     void execute()
     {
-        //Complex tasks will start there execution
+        //Complex tasks will start their execution
         //but then halt until they are poked.
         boost::unique_lock<boost::mutex> lock(mLock);
         while (!mContinue)
@@ -118,6 +118,15 @@ public:
 
 typedef IceUtil::Handle<ComplexTask> ComplexTaskPtr;
 
+#define WAIT_FOR_THREADS(condition) \
+{\
+    boost::unique_lock<boost::mutex> lock(listener->mLock);\
+    while ((condition))\
+    {\
+        listener->mCond.wait(lock);\
+    }\
+}\
+
 BOOST_AUTO_TEST_SUITE(ThreadPoolTest)
 
 BOOST_AUTO_TEST_CASE(addWork)
@@ -152,14 +161,8 @@ BOOST_AUTO_TEST_CASE(threadCreation)
     //The thread will initially be active but will
     //turn idle nearly immediately since there is no
     //work to do.
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle == 0);
+    
     BOOST_CHECK(listener->mIdle == 1);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -169,41 +172,21 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
 {
     BOOST_TEST_MESSAGE("Running threadDestruction test");
 
-    std::cout << "Really early failure?" << std::endl;
     TestListenerPtr listener(new TestListener);
     QueuePtr queue(new WorkQueue());
     PoolPtr pool(new ThreadPool(listener, queue));
     
     pool->setSize(3);
 
-    std::cout << "Early failure?" << std::endl;
-
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle != 3)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
-    std::cout << "Everything cool?" << std::endl;
+    WAIT_FOR_THREADS(listener->mIdle < 3);
 
     BOOST_CHECK(listener->mIdle == 3);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
 
-    std::cout << "Everything still cool?" << std::endl;
     pool->setSize(2);
 
-    std::cout << "How about now?" << std::endl;
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle != 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-    std::cout << "And now?" << std::endl;
+    WAIT_FOR_THREADS(listener->mIdle > 2);
 
     BOOST_CHECK(listener->mIdle == 2);
     BOOST_CHECK(listener->mActive == 0);
@@ -237,14 +220,8 @@ BOOST_AUTO_TEST_CASE(oneTaskOneThread)
 
     //The thread should be idle now. Let's make sure
     //that's happening.
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle == 0);
+    
     BOOST_CHECK(listener->mIdle == 1);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -261,14 +238,8 @@ BOOST_AUTO_TEST_CASE(oneThreadOneTask)
 
     pool->setSize(1);
 
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle < 1);
+    
     //The thread is idle now. When we queue work, it should
     //become active and execute the work.
     queue->enqueueWork(work);
@@ -285,14 +256,8 @@ BOOST_AUTO_TEST_CASE(oneThreadOneTask)
     BOOST_CHECK(listener->mEmptyNotice == true);
 
     //And of course, the thread should become idle once work is done
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle < 1);
+    
     BOOST_CHECK(listener->mIdle == 1);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -334,14 +299,8 @@ BOOST_AUTO_TEST_CASE(oneThreadMultipleTasks)
     BOOST_CHECK(work3->taskExecuted == true);
 
     //And of course, the thread should become idle once work is done
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle < 1);
+    
     BOOST_CHECK(listener->mIdle == 1);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -372,15 +331,8 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
     //Pool operations are asynchronous, so we need to
     //wait until the listener is informed that there are
     //two active threads
-    
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mActive < 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-    std::cout << "how many active? " << listener->mActive << std::endl;
+    WAIT_FOR_THREADS(listener->mActive < 2);
+
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -413,14 +365,8 @@ BOOST_AUTO_TEST_CASE(taskDistribution)
     BOOST_CHECK(work2->taskExecuted == true);
 
     //And of course, the threads should become idle once work is done
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle < 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle < 2);
+    
     BOOST_CHECK(listener->mIdle == 2);
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -444,30 +390,19 @@ BOOST_AUTO_TEST_CASE(zombies)
 
     pool->setSize(2);
     
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mActive < 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
+    WAIT_FOR_THREADS(listener->mActive < 2);
+    
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 0);
 
     //Now we'll set the size down to 0. This should
-    //result in the active threads immediately becoming
-    //zombies.
+    //result in the active threads being turned to
+    //zombies
     pool->setSize(0);
 
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mZombie < 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-    std::cout << "Number of threads. Active: " << listener->mActive << " Idle: " << listener->mIdle << " Zombie: " << listener->mZombie;
+    WAIT_FOR_THREADS(listener->mZombie < 2);
+
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 2);
@@ -502,14 +437,8 @@ BOOST_AUTO_TEST_CASE(zombies)
 
     //Since the tasks finished executing, the zombie
     //threads should die.
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mZombie > 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mZombie > 0);
+    
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 0);
@@ -536,14 +465,8 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
     // All threads start as active, but 2 should become
     // idle nearly immediately. We don't want to proceed
     // until we know the two threads have become idle.
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle < 2)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle < 2);
+    
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 2);
     BOOST_CHECK(listener->mZombie == 0);
@@ -553,14 +476,8 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
     //Previous state was 2 active and 2 idle threads.
     //Removing 3 threads should kill the 2 idle threads
     //and change one of the active threads to a zombie.
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mIdle > 0 || listener->mZombie == 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mIdle > 0 || listener->mZombie == 0);
+    
     BOOST_CHECK(listener->mActive == 1);
     BOOST_CHECK(listener->mIdle == 0);
     BOOST_CHECK(listener->mZombie == 1);
@@ -590,14 +507,8 @@ BOOST_AUTO_TEST_CASE(moreThreadDestruction)
     BOOST_CHECK(work1->taskExecuted == true);
     BOOST_CHECK(work2->taskExecuted == true);
     
-    {
-        boost::unique_lock<boost::mutex> lock(listener->mLock);
-        while (listener->mZombie > 0 || listener->mActive > 0)
-        {
-            listener->mCond.wait(lock);
-        }
-    }
-
+    WAIT_FOR_THREADS(listener->mZombie > 0 || listener->mActive > 0);
+    
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mIdle == 1);
     BOOST_CHECK(listener->mZombie == 0);

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list