[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Tue Apr 5 17:30:27 CDT 2011


branch "workqueue" has been updated
       via  2ccdcf9806863b2cfe34397f87714b270bd74961 (commit)
       via  eafaf4cde70b6c8af586082bd9ab07abba9f9d87 (commit)
       via  9aa63ed4e16296c5107543bc83ae7e4a02216be8 (commit)
      from  37097a645109484af116b30c53a92994d2c4c262 (commit)

Summary of changes:
 CMakeLists.txt                              |    1 +
 ThreadPool/CMakeLists.txt                   |    1 -
 ThreadPool/include/AsteriskSCF/ThreadPool.h |   31 ++--
 ThreadPool/src/CMakeLists.txt               |    2 +
 ThreadPool/src/ThreadPool.cpp               |  331 ++++++++++++++++-----------
 5 files changed, 212 insertions(+), 154 deletions(-)


- Log -----------------------------------------------------------------
commit 2ccdcf9806863b2cfe34397f87714b270bd74961
Merge: eafaf4c 37097a6
Author: Mark Michelson <mmichelson at digium.com>
Date:   Tue Apr 5 17:29:56 2011 -0500

    Merge branch 'workqueue' of git.asterisk.org:asterisk-scf/integration/ice-util-cpp into workqueue


commit eafaf4cde70b6c8af586082bd9ab07abba9f9d87
Author: Mark Michelson <mmichelson at digium.com>
Date:   Tue Apr 5 17:28:09 2011 -0500

    A compiling and potentially correct version of the thread pool.
    
    Tests are planned, now to write the suckers.

diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index bb3980e..70fad04 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -25,7 +25,7 @@ namespace AsteriskSCF
 namespace ThreadPool
 {
 
-class WorkerThread;
+class ThreadPoolPriv;
 
 class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
 {
@@ -33,27 +33,23 @@ public:
     ThreadPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
     void setSize(::Ice::Int);
     AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
-    void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
-    void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+private:
+    boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
+};
 
-    //QueueListener overloads
+class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+    ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something,
+            const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
     void workAdded(bool wasEmpty);
     void workResumable();
     void emptied();
 private:
-    void grow(int numNewThreads);
-    void shrink(int threadsToKill);
-
-    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
-    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
-    
-    std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
-    std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
-    std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+    boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
+    AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
 };
 
-typedef IceUtil::Handle<ThreadPool> ThreadPoolPtr;
-
 class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
 {
 public:
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
index d26144c..b7e1aee 100644
--- a/ThreadPool/src/CMakeLists.txt
+++ b/ThreadPool/src/CMakeLists.txt
@@ -15,6 +15,8 @@ include_directories(${API_INCLUDE_DIR})
 
 asterisk_scf_component_add_file(ThreadPool
     ../include/AsteriskSCF/ThreadPool.h)
+asterisk_scf_component_add_file(ThreadPool
+    ../include/AsteriskSCF/WorkerThread.h)
 asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
 asterisk_scf_component_add_boost_libraries(ThreadPool thread)
 
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 1f05af8..22bdcaf 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -17,6 +17,7 @@
 #include <boost/thread.hpp>
 
 #include <AsteriskSCF/ThreadPool.h>
+#include <AsteriskSCF/WorkerThread.h>
 
 namespace AsteriskSCF
 {
@@ -26,36 +27,34 @@ namespace ThreadPool
 using namespace AsteriskSCF::System::ThreadPool::V1;
 using namespace AsteriskSCF::System::WorkQueue::V1;
 
-class WorkerThread
-{
-public:
-    WorkerThread(const QueuePtr& workQueue)
-        : mState(Active), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
+WorkerThread::WorkerThread(const QueuePtr& workQueue, const boost::shared_ptr<WorkerThreadListener>& listener)
+    : mState(Active), mListener(listener), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
 
-    void active()
+void WorkerThread::active()
+{
+    while (mState == Active)
     {
-        while (mState == Active)
+        if (!mQueue->executeWork())
         {
-            if (!mQueue->executeWork())
-            {
-                idle();
-            }
+            idle();
         }
+    }
 
-        // Reaching this portion means the thread is
-        // on death's door. It may have been killed while
-        // it was idle, in which case it can just die
-        // peacefully. If it's a zombie, though, then
-        // it needs to let the ThreadPoolImpl know so
-        // that the thread can be removed from the
-        // vector of zombie threads.
-        if (mState == Zombie)
-        {
-            mPool->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
-        }
+    // Reaching this portion means the thread is
+    // on death's door. It may have been killed while
+    // it was idle, in which case it can just die
+    // peacefully. If it's a zombie, though, then
+    // it needs to let the ThreadPoolImpl know so
+    // that the thread can be removed from the
+    // vector of zombie threads.
+    if (mState == Zombie)
+    {
+        mListener->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
     }
+}
 
-    void idle()
+void WorkerThread::idle()
+{
     {
         boost::unique_lock<boost::mutex> lock(mLock);
         // If we've been turned into a zombie while we
@@ -68,142 +67,174 @@ public:
         // Otherwise, we'll set ourselves idle and wait
         // for a poke
         mState = Idle;
-        mPool->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
-        mCond.wait(lock);
     }
 
-    enum ThreadState
-    {
-        /**
-         * Actively doing work
-         */
-        Active,
-        /**
-         * Nothing to do; waiting to be poked
-         */
-        Idle,
-        /**
-         * Marked for deletion. May still be executing
-         * code but next time it checks its state, it
-         * will die.
-         *
-         * Basically what happens when we try to kill
-         * a thread when it is Active
-         */
-        Zombie,
-        /**
-         * The ThreadPoolImpl considers this thread to
-         * be gone. The thread just needs to get out of
-         * the way ASAP.
-         *
-         * Basically what happens when we try to kill
-         * a thread when it is Idle
-         */
-        Dead
-    } mState;
-
-    void poke(ThreadState newState)
+    mListener->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
+
     {
         boost::unique_lock<boost::mutex> lock(mLock);
-        mState = newState;
-        mCond.notify_one();
+        while (mState == Idle)
+        {
+            mCond.wait(lock);
+        }
     }
+}
 
-    QueuePtr mQueue;
-    boost::thread mThread;
-    boost::condition_variable mCond;
-    boost::mutex mLock;
-    ThreadPoolPtr mPool;
-};
-
-void ThreadPool::grow(int numNewThreads)
+void WorkerThread::poke(ThreadState newState)
 {
-    for (int i = 0; i < numNewThreads; ++i)
-    {
-        boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue));
-        mActiveThreads.push_back(newThread);
-    }
+    boost::unique_lock<boost::mutex> lock(mLock);
+    mState = newState;
+    mCond.notify_one();
 }
 
-void ThreadPool::shrink(int threadsToKill)
+class ThreadQueueListener;
+
+class ThreadPoolPriv : public WorkerThreadListener
 {
-    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
-            i != mIdleThreads.end(); ++i)
+public:
+
+    ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, const PoolPtr& pool)
+        : mListener(listener), mQueue(queue), mPool(pool)
     {
-        mIdleThreads.erase(i);
-        (*i)->poke(WorkerThread::Dead);
+        mQueue->setListener(new ThreadQueueListener(boost::shared_ptr<ThreadPoolPriv> (this), mPool));
+    }
 
-        if (--threadsToKill == 0)
+    void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
+    {
+        int activeSize;
+        int idleSize;
+        int zombieSize;
         {
-            return;
+            boost::unique_lock<boost::mutex> lock(mLock);
+            std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
+                std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
+    
+            if (iter != mActiveThreads.end())
+            {
+                mIdleThreads.push_back(*iter);
+                mActiveThreads.erase(iter);
+            }
+            activeSize = mActiveThreads.size();
+            idleSize = mIdleThreads.size();
+            zombieSize = mZombieThreads.size();
         }
+        mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
-
-    // If we've made it here, then it means that there weren't enough idle
-    // threads to kill. We'll need to zombify some active threads then.
-    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
-            i != mActiveThreads.end(); ++i)
+    
+    void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
     {
-        //Active threads, on the other hand, need to at least temporarily be
-        //pushed into the zombie container.
-        mZombieThreads.push_back(*i);
-        mActiveThreads.erase(i);
-        (*i)->poke(WorkerThread::Zombie);
-
-        if (--threadsToKill == 0)
+        int activeSize;
+        int idleSize;
+        int zombieSize;
         {
-            return;
+            boost::unique_lock<boost::mutex> lock(mLock);
+            mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
+            activeSize = mActiveThreads.size();
+            idleSize = mIdleThreads.size();
+            zombieSize = mZombieThreads.size();
         }
+        mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
-}
-
-void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
-{
-    std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
-        std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
-
-    if (iter != mActiveThreads.end())
+    
+    void grow(int numNewThreads)
     {
-        mIdleThreads.push_back(*iter);
-        mActiveThreads.erase(iter);
+        int activeSize;
+        int idleSize;
+        int zombieSize;
+        {
+            boost::unique_lock<boost::mutex> lock(mLock);
+            for (int i = 0; i < numNewThreads; ++i)
+            {
+                boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue, boost::shared_ptr<ThreadPoolPriv>(this)));
+                mActiveThreads.push_back(newThread);
+            }
+            activeSize = mActiveThreads.size();
+            idleSize = mIdleThreads.size();
+            zombieSize = mZombieThreads.size();
+        }
+        mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
-    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
-
-void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
-{
-    mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
-    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
-
-ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
-    : mQueue(queue), mListener(listener)
-{
-    queue->setListener(this);
-}
-
-void ThreadPool::setSize(int size)
-{
-    int currentSize = mActiveThreads.size() + mIdleThreads.size() + mZombieThreads.size();
-    if (size < currentSize)
+    
+    void shrink(int threadsToKill)
     {
-        shrink(currentSize - size);
+        boost::unique_lock<boost::mutex> lock(mLock);
+        for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+                i != mIdleThreads.end(); ++i)
+        {
+            mIdleThreads.erase(i);
+            (*i)->poke(Dead);
+    
+            if (--threadsToKill == 0)
+            {
+                return;
+            }
+        }
+    
+        // If we've made it here, then it means that there weren't enough idle
+        // threads to kill. We'll need to zombify some active threads then.
+        for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
+                i != mActiveThreads.end(); ++i)
+        {
+            //Active threads, on the other hand, need to at least temporarily be
+            //pushed into the zombie container.
+            mZombieThreads.push_back(*i);
+            mActiveThreads.erase(i);
+            (*i)->poke(Zombie);
+    
+            if (--threadsToKill == 0)
+            {
+                return;
+            }
+        }
     }
-    else
+
+    void resize(int numThreads)
     {
-        grow(size - currentSize);
+        int activeSize;
+        int idleSize;
+        int zombieSize;
+        {
+            boost::unique_lock<boost::mutex> lock(mLock);
+            //We don't count zombie threads as being "live" when potentially resizing
+            int currentSize = mActiveThreads.size() + mIdleThreads.size();
+
+            if (currentSize == numThreads)
+            {
+                return;
+            }
+
+            if (currentSize < numThreads)
+            {
+                grow(numThreads - currentSize);
+            }
+            else
+            {
+                shrink(currentSize - numThreads);
+            }
+            activeSize = mActiveThreads.size();
+            idleSize = mIdleThreads.size();
+            zombieSize = mZombieThreads.size();
+        }
+        mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
     }
-    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
 
-QueuePtr ThreadPool::getQueue()
+    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+    AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+    
+    std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
+    std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
+    std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+
+    boost::mutex mLock;
+};
+
+ThreadQueueListener::ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something, const PoolPtr& pool)
+    : mThreadPoolPriv(something), mPool(pool)
 {
-    return mQueue;
 }
 
-//QueueListener overloads
-
-void ThreadPool::workAdded(bool wasEmpty)
+void ThreadQueueListener::workAdded(bool wasEmpty)
 {
     // XXX The interface for PoolListener says that the second parameter of this
     // method should be the amount of new items added. Well, that's just a pain to
@@ -213,28 +244,48 @@ void ThreadPool::workAdded(bool wasEmpty)
     // If we want to report the amount of new work, then I would strongly suggest
     // changing the QueueListener's workAdded method to include the amount of new
     // work added.
-    mListener->queueWorkAdded(this, mQueue->getSize(), wasEmpty);
+    mThreadPoolPriv->mListener->queueWorkAdded(mPool, mThreadPoolPriv->mQueue->getSize(), wasEmpty);
     // XXX This could potentially stand to be more sophisticated, but poking all the
     // idle threads will get the job done too.
     //
     // A potential alternative would be to poke a number of idle threads equal to the
     // new work count.
-    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
-            i != mIdleThreads.end(); ++i)
+    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mThreadPoolPriv->mIdleThreads.begin();
+            i != mThreadPoolPriv->mIdleThreads.end(); ++i)
     {
-        (*i)->poke(WorkerThread::Active);
+        (*i)->poke(Active);
     }
 }
-void ThreadPool::workResumable()
+
+void ThreadQueueListener::workResumable()
 {
-    /* This should never be called since we use a standard
-     * work queue in the thread pool.
+    /* This should never be called since the ThreadPool does not
+     * use a SuspendableQueue
      */
     assert(false);
 }
-void ThreadPool::emptied()
+
+void ThreadQueueListener::emptied()
+{
+    mThreadPoolPriv->mListener->queueEmptied(mPool);
+}
+
+ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+    : mThreadPoolPriv(new ThreadPoolPriv(listener, queue, this))
+{
+}
+
+void ThreadPool::setSize(int size)
+{
+    if (size >= 0)
+    {
+        mThreadPoolPriv->resize(size);
+    }
+}
+
+QueuePtr ThreadPool::getQueue()
 {
-    mListener->queueEmptied(this);
+    return mThreadPoolPriv->mQueue;
 }
 
 }; // end namespace ThreadPool

commit 9aa63ed4e16296c5107543bc83ae7e4a02216be8
Author: Mark Michelson <mmichelson at digium.com>
Date:   Thu Mar 31 17:42:21 2011 -0500

    Get Threadpool code compiling.
    
    There are still tons of issues with it, but I'll address them
    later.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 21a57d0..b728bab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,3 +7,4 @@ add_subdirectory(SmartProxy)
 add_subdirectory(StateReplicator)
 add_subdirectory(AmiCollector)
 add_subdirectory(WorkQueue)
+add_subdirectory(ThreadPool)
diff --git a/ThreadPool/CMakeLists.txt b/ThreadPool/CMakeLists.txt
index b1a0931..5e888f0 100644
--- a/ThreadPool/CMakeLists.txt
+++ b/ThreadPool/CMakeLists.txt
@@ -7,4 +7,3 @@
 #
 
 add_subdirectory(src)
-add_subdirectory(test)
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index ebe3135..bb3980e 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -18,6 +18,7 @@
 #include <boost/shared_ptr.hpp>
 
 #include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
 
 namespace AsteriskSCF
 {
@@ -29,24 +30,30 @@ class WorkerThread;
 class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
 {
 public:
-    ThreadPool();
-    ThreadPool(int size);
+    ThreadPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
     void setSize(::Ice::Int);
     AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+    void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
+    void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+
+    //QueueListener overloads
+    void workAdded(bool wasEmpty);
+    void workResumable();
+    void emptied();
 private:
     void grow(int numNewThreads);
     void shrink(int threadsToKill);
-    void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
-    void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
 
-    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
     AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
     
     std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
     std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
     std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
 };
 
+typedef IceUtil::Handle<ThreadPool> ThreadPoolPtr;
+
 class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
 {
 public:
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index b64d705..1f05af8 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -14,8 +14,9 @@
  * at the top of the source tree.
  */
 
+#include <boost/thread.hpp>
+
 #include <AsteriskSCF/ThreadPool.h>
-#include <AsteriskSCF/WorkQueue.h>
 
 namespace AsteriskSCF
 {
@@ -29,7 +30,7 @@ class WorkerThread
 {
 public:
     WorkerThread(const QueuePtr& workQueue)
-        : mQueue(workQueue), mState(Active), mThread(boost::bind(&WorkerThread::active, this)) { }
+        : mState(Active), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
 
     void active()
     {
@@ -50,7 +51,7 @@ public:
         // vector of zombie threads.
         if (mState == Zombie)
         {
-            mPool->zombieThreadDead(this);
+            mPool->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
         }
     }
 
@@ -67,17 +68,10 @@ public:
         // Otherwise, we'll set ourselves idle and wait
         // for a poke
         mState = Idle;
-        mPool->activeThreadIdle(this);
+        mPool->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
         mCond.wait(lock);
     }
 
-    void poke(ThreadState newState)
-    {
-        boost::unique_lock<boost::mutex> lock(mLock);
-        mState = newState;
-        mCond.notify_one();
-    }
-
     enum ThreadState
     {
         /**
@@ -108,10 +102,18 @@ public:
         Dead
     } mState;
 
+    void poke(ThreadState newState)
+    {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        mState = newState;
+        mCond.notify_one();
+    }
+
     QueuePtr mQueue;
     boost::thread mThread;
     boost::condition_variable mCond;
     boost::mutex mLock;
+    ThreadPoolPtr mPool;
 };
 
 void ThreadPool::grow(int numNewThreads)
@@ -140,7 +142,7 @@ void ThreadPool::shrink(int threadsToKill)
     // If we've made it here, then it means that there weren't enough idle
     // threads to kill. We'll need to zombify some active threads then.
     for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
-            i != mThreads.end(); ++i)
+            i != mActiveThreads.end(); ++i)
     {
         //Active threads, on the other hand, need to at least temporarily be
         //pushed into the zombie container.
@@ -165,19 +167,19 @@ void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
         mIdleThreads.push_back(*iter);
         mActiveThreads.erase(iter);
     }
-    mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
 }
 
 void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
 {
     mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
-    mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
 }
 
-ThreadPool::ThreadPool(const ThreadPoolListenerPtr &listener, const QueuePtr &queue)
-    : mQueue(queue), mListener(listener), mQueueListener(new ThreadPoolQueueListener(this))
+ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+    : mQueue(queue), mListener(listener)
 {
-    queue->setListener(mQueue);
+    queue->setListener(this);
 }
 
 void ThreadPool::setSize(int size)
@@ -211,7 +213,7 @@ void ThreadPool::workAdded(bool wasEmpty)
     // If we want to report the amount of new work, then I would strongly suggest
     // changing the QueueListener's workAdded method to include the amount of new
     // work added.
-    mListener->queueWorkAdded(this, mQueue->workCount(), wasEmpty);
+    mListener->queueWorkAdded(this, mQueue->getSize(), wasEmpty);
     // XXX This could potentially stand to be more sophisticated, but poking all the
     // idle threads will get the job done too.
     //
@@ -232,7 +234,7 @@ void ThreadPool::workResumable()
 }
 void ThreadPool::emptied()
 {
-    mListener->queueEmptied(this)
+    mListener->queueEmptied(this);
 }
 
 }; // end namespace ThreadPool

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list