[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Thu Apr 7 18:15:53 CDT 2011


branch "workqueue" has been updated
       via  782ecd7fa52a89e4116c9cbb9c75675e5316bc33 (commit)
      from  a823445114cf5e0b548c7467e7b8e4a43a4a6e92 (commit)

Summary of changes:
 ThreadPool/include/AsteriskSCF/ThreadPool.h |    8 +-
 ThreadPool/src/ThreadPool.cpp               |   80 ++++++++++++-----------
 ThreadPool/test/TestThreadPool.cpp          |   95 +++++++++++++++++++++++++--
 3 files changed, 135 insertions(+), 48 deletions(-)


- Log -----------------------------------------------------------------
commit 782ecd7fa52a89e4116c9cbb9c75675e5316bc33
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Apr 7 18:12:37 2011 -0500

    At this point the tests are all passing, valgrind reports no errors, and there are no memory/thread leaks.
    
    The highlights:
    
    * Make sure to join and delete the thread objects when we're done with them.
    * Convert many shared pointers to bare ones
    
    The last functional thing I want to do before cleanup/documentation/beautification
    is to try to fix the problem where we call the pool listener's statechanged() method
    with the pool's lock held.

diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index 292c993..4911037 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -40,14 +40,14 @@ private:
 class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
 {
 public:
-    ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something,
-            const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+    ThreadQueueListener(ThreadPoolPriv *something,
+            AsteriskSCF::System::ThreadPool::V1::Pool *pool);
     void workAdded(bool wasEmpty);
     void workResumable();
     void emptied();
 private:
-    boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
-    AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+    ThreadPoolPriv *mThreadPoolPriv;
+    AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
 };
 
 class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 20ad0bd..6dcdfda 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -87,16 +87,25 @@ void WorkerThread::poke(ThreadState newState)
     mCond.notify_one();
 }
 
+void WorkerThread::join()
+{
+    mThread.join();
+}
+
 class ThreadQueueListener;
 
 class ThreadPoolPriv : public WorkerThreadListener
 {
 public:
-
-    ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, const PoolPtr& pool)
+    ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
         : mListener(listener), mQueue(queue), mPool(pool)
     {
-        mQueue->setListener(new ThreadQueueListener(boost::shared_ptr<ThreadPoolPriv> (this), mPool));
+        mQueue->setListener(new ThreadQueueListener(this, mPool));
+    }
+
+    ~ThreadPoolPriv()
+    {
+        resize(0);
     }
 
     void activeThreadIdle(WorkerThread *thread)
@@ -117,7 +126,6 @@ public:
             activeSize = mActiveThreads.size();
             idleSize = mIdleThreads.size();
             zombieSize = mZombieThreads.size();
-            std::cout << "Active Thread has become idle. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
             //XXX I don't like calling listener operations without a lock held since it makes
             //it much more difficult to call pool operations from the listener. The problem is
             //that by NOT calling with the lock held, state changes can arrive out of order...believe
@@ -136,60 +144,57 @@ public:
             std::vector<WorkerThread*>::iterator i = std::find(mZombieThreads.begin(), mZombieThreads.end(), thread);
             if (i != mZombieThreads.end())
             {
+                WorkerThread *doomed = *i;
                 mZombieThreads.erase(i);
-                delete *i;
+                delete doomed;
             }
             activeSize = mActiveThreads.size();
             idleSize = mIdleThreads.size();
             zombieSize = mZombieThreads.size();
-            std::cout << "Endeadened a Zombie thread. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
             mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
         }
     }
     
     void grow(int numNewThreads)
     {
-        std::cout << "Growing...adding " << numNewThreads << " threads" << std::endl;
-       for (int i = 0; i < numNewThreads; ++i)
-       {
-           WorkerThread *newThread(new WorkerThread(mQueue, this));
-           mActiveThreads.push_back(newThread);
-       }
+        for (int i = 0; i < numNewThreads; ++i)
+        {
+            WorkerThread *newThread(new WorkerThread(mQueue, this));
+            mActiveThreads.push_back(newThread);
+        }
     }
     
     void shrink(int threadsToKill)
     {
-        std::cout << "Shrinking...removing " << threadsToKill << " threads" << std::endl;
-        for (std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
-                i != mIdleThreads.end(); ++i)
+        while (threadsToKill > 0)
         {
-            mIdleThreads.erase(i);
-            (*i)->poke(Dead);
-            delete *i;
-    
-            if (--threadsToKill == 0)
+            std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
+            if (i == mIdleThreads.end())
             {
-                return;
+                //We're out of idle threads to kill.
+                break;
             }
+            WorkerThread *doomed = *i;
+            mIdleThreads.erase(i);
+            doomed->poke(Dead);
+            doomed->join();
+            delete doomed;
+    
+            --threadsToKill;
         }
     
-        // If we've made it here, then it means that there weren't enough idle
-        // threads to kill. We'll need to zombify some active threads then.
-        
-        std::cout << "Not enough idle threads to kill. About to zombify " << threadsToKill << " threads" << std::endl;
-        for (std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
-                i != mActiveThreads.end(); ++i)
+        while (threadsToKill > 0)
         {
-            //Active threads, on the other hand, need to at least temporarily be
-            //pushed into the zombie container.
-            mZombieThreads.push_back(*i);
-            (*i)->poke(Zombie);
-            mActiveThreads.erase(i);
-    
-            if (--threadsToKill == 0)
+            // If we've made it here, then it means that there weren't enough idle
+            // threads to kill. We'll need to zombify some active threads then.
+            std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
+            if (i != mActiveThreads.end())
             {
-                return;
+                mZombieThreads.push_back(*i);
+                (*i)->poke(Zombie);
+                mActiveThreads.erase(i);
             }
+            --threadsToKill;
         }
     }
 
@@ -219,14 +224,13 @@ public:
             activeSize = mActiveThreads.size();
             idleSize = mIdleThreads.size();
             zombieSize = mZombieThreads.size();
-            std::cout << "Finished resizing. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
             mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
         }
     }
 
     AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
     AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
-    AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+    AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
     
     std::vector<WorkerThread*> mActiveThreads;
     std::vector<WorkerThread*> mIdleThreads;
@@ -235,7 +239,7 @@ public:
     boost::mutex mLock;
 };
 
-ThreadQueueListener::ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something, const PoolPtr& pool)
+ThreadQueueListener::ThreadQueueListener(ThreadPoolPriv *something, Pool *pool)
     : mThreadPoolPriv(something), mPool(pool)
 {
 }
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 55ba29b..966515b 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -39,7 +39,6 @@ public:
         mActive = active;
         mIdle = idle;
         mZombie = zombie;
-        std::cout << "Got stateChanged message: idle: " << mIdle << ". active: " << mActive << ". zombie: " << mZombie << std::endl;
         mCond.notify_one();
     }
 
@@ -174,8 +173,6 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
     QueuePtr queue(new WorkQueue());
     PoolPtr pool(new ThreadPool(listener, queue));
     
-    std::cout << "Initializing 3 threads for thread pool" << std::endl;
-
     pool->setSize(3);
 
     {
@@ -190,8 +187,6 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
     BOOST_CHECK(listener->mActive == 0);
     BOOST_CHECK(listener->mZombie == 0);
 
-    std::cout << "Shrinking thread pool to 2 threads" << std::endl;
-
     pool->setSize(2);
 
     {
@@ -429,7 +424,7 @@ BOOST_AUTO_TEST_CASE(zombies)
     pool->setSize(2);
     
     //Since these tasks halt until they are poked,
-    //The two tasks should be evenly divided amongst
+    //The two tasks should be evenly divided among
     //the threads.
     BOOST_CHECK(listener->mActive == 2);
     BOOST_CHECK(listener->mIdle == 0);
@@ -486,4 +481,92 @@ BOOST_AUTO_TEST_CASE(zombies)
     BOOST_CHECK(listener->mZombie == 0);
 }
 
+BOOST_AUTO_TEST_CASE(moreThreadDestruction)
+{
+    BOOST_TEST_MESSAGE("Running moreThreadDestruction test");
+
+    TestListenerPtr listener(new TestListener);
+    QueuePtr queue(new WorkQueue());
+    PoolPtr pool(new ThreadPool(listener, queue));
+    ComplexTaskPtr work1(new ComplexTask());
+    ComplexTaskPtr work2(new ComplexTask());
+    WorkSeq works;
+
+    works.push_back(work1);
+    works.push_back(work2);
+
+    queue->enqueueWorkSeq(works);
+
+    pool->setSize(4);
+
+    // All threads start as active, but 2 should become
+    // idle nearly immediately. We don't want to proceed
+    // until we know the two threads have become idle.
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mIdle < 2)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
+
+    BOOST_CHECK(listener->mActive == 2);
+    BOOST_CHECK(listener->mIdle == 2);
+    BOOST_CHECK(listener->mZombie == 0);
+
+    pool->setSize(1);
+
+    //Previous state was 2 active and 2 idle threads.
+    //Removing 3 threads should kill the 2 idle threads
+    //and change one of the active threads to a zombie.
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mIdle > 0 || listener->mZombie == 0)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
+
+    BOOST_CHECK(listener->mActive == 1);
+    BOOST_CHECK(listener->mIdle == 0);
+    BOOST_CHECK(listener->mZombie == 1);
+
+    {
+        boost::unique_lock<boost::mutex> lock1(work1->mLock);
+        boost::unique_lock<boost::mutex> lock2(work2->mLock);
+        work1->mContinue = true;
+        work2->mContinue = true;
+        work1->mStall.notify_one();
+        work2->mStall.notify_one();
+    }
+
+    {
+        boost::unique_lock<boost::mutex> lock1(work1->mLock);
+        boost::unique_lock<boost::mutex> lock2(work2->mLock);
+        while (!work1->taskExecuted)
+        {
+            work1->mDone.wait(lock1);
+        }
+        while (!work2->taskExecuted)
+        {
+            work2->mDone.wait(lock2);
+        }
+    }
+
+    BOOST_CHECK(work1->taskExecuted == true);
+    BOOST_CHECK(work2->taskExecuted == true);
+    
+    {
+        boost::unique_lock<boost::mutex> lock(listener->mLock);
+        while (listener->mZombie > 0 || listener->mActive > 0)
+        {
+            listener->mCond.wait(lock);
+        }
+    }
+
+    BOOST_CHECK(listener->mActive == 0);
+    BOOST_CHECK(listener->mIdle == 1);
+    BOOST_CHECK(listener->mZombie == 0);
+}
+
 BOOST_AUTO_TEST_SUITE_END()

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list