[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Tue Apr 5 17:30:27 CDT 2011
branch "workqueue" has been updated
via 2ccdcf9806863b2cfe34397f87714b270bd74961 (commit)
via eafaf4cde70b6c8af586082bd9ab07abba9f9d87 (commit)
via 9aa63ed4e16296c5107543bc83ae7e4a02216be8 (commit)
from 37097a645109484af116b30c53a92994d2c4c262 (commit)
Summary of changes:
CMakeLists.txt | 1 +
ThreadPool/CMakeLists.txt | 1 -
ThreadPool/include/AsteriskSCF/ThreadPool.h | 31 ++--
ThreadPool/src/CMakeLists.txt | 2 +
ThreadPool/src/ThreadPool.cpp | 331 ++++++++++++++++-----------
5 files changed, 212 insertions(+), 154 deletions(-)
- Log -----------------------------------------------------------------
commit 2ccdcf9806863b2cfe34397f87714b270bd74961
Merge: eafaf4c 37097a6
Author: Mark Michelson <mmichelson at digium.com>
Date: Tue Apr 5 17:29:56 2011 -0500
Merge branch 'workqueue' of git.asterisk.org:asterisk-scf/integration/ice-util-cpp into workqueue
commit eafaf4cde70b6c8af586082bd9ab07abba9f9d87
Author: Mark Michelson <mmichelson at digium.com>
Date: Tue Apr 5 17:28:09 2011 -0500
A compiling and potentially correct version of the thread pool.
Tests are planned, now to write the suckers.
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index bb3980e..70fad04 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -25,7 +25,7 @@ namespace AsteriskSCF
namespace ThreadPool
{
-class WorkerThread;
+class ThreadPoolPriv;
class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
{
@@ -33,27 +33,23 @@ public:
ThreadPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
void setSize(::Ice::Int);
AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
- void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
- void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+private:
+ boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
+};
- //QueueListener overloads
+class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something,
+ const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
void workAdded(bool wasEmpty);
void workResumable();
void emptied();
private:
- void grow(int numNewThreads);
- void shrink(int threadsToKill);
-
- AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
- AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
-
- std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
- std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
- std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+ boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
};
-typedef IceUtil::Handle<ThreadPool> ThreadPoolPtr;
-
class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
{
public:
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
index d26144c..b7e1aee 100644
--- a/ThreadPool/src/CMakeLists.txt
+++ b/ThreadPool/src/CMakeLists.txt
@@ -15,6 +15,8 @@ include_directories(${API_INCLUDE_DIR})
asterisk_scf_component_add_file(ThreadPool
../include/AsteriskSCF/ThreadPool.h)
+asterisk_scf_component_add_file(ThreadPool
+ ../include/AsteriskSCF/WorkerThread.h)
asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
asterisk_scf_component_add_boost_libraries(ThreadPool thread)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 1f05af8..22bdcaf 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -17,6 +17,7 @@
#include <boost/thread.hpp>
#include <AsteriskSCF/ThreadPool.h>
+#include <AsteriskSCF/WorkerThread.h>
namespace AsteriskSCF
{
@@ -26,36 +27,34 @@ namespace ThreadPool
using namespace AsteriskSCF::System::ThreadPool::V1;
using namespace AsteriskSCF::System::WorkQueue::V1;
-class WorkerThread
-{
-public:
- WorkerThread(const QueuePtr& workQueue)
- : mState(Active), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
+WorkerThread::WorkerThread(const QueuePtr& workQueue, const boost::shared_ptr<WorkerThreadListener>& listener)
+ : mState(Active), mListener(listener), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
- void active()
+void WorkerThread::active()
+{
+ while (mState == Active)
{
- while (mState == Active)
+ if (!mQueue->executeWork())
{
- if (!mQueue->executeWork())
- {
- idle();
- }
+ idle();
}
+ }
- // Reaching this portion means the thread is
- // on death's door. It may have been killed while
- // it was idle, in which case it can just die
- // peacefully. If it's a zombie, though, then
- // it needs to let the ThreadPoolImpl know so
- // that the thread can be removed from the
- // vector of zombie threads.
- if (mState == Zombie)
- {
- mPool->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
- }
+ // Reaching this portion means the thread is
+ // on death's door. It may have been killed while
+ // it was idle, in which case it can just die
+ // peacefully. If it's a zombie, though, then
+ // it needs to let the ThreadPoolImpl know so
+ // that the thread can be removed from the
+ // vector of zombie threads.
+ if (mState == Zombie)
+ {
+ mListener->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
}
+}
- void idle()
+void WorkerThread::idle()
+{
{
boost::unique_lock<boost::mutex> lock(mLock);
// If we've been turned into a zombie while we
@@ -68,142 +67,174 @@ public:
// Otherwise, we'll set ourselves idle and wait
// for a poke
mState = Idle;
- mPool->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
- mCond.wait(lock);
}
- enum ThreadState
- {
- /**
- * Actively doing work
- */
- Active,
- /**
- * Nothing to do; waiting to be poked
- */
- Idle,
- /**
- * Marked for deletion. May still be executing
- * code but next time it checks its state, it
- * will die.
- *
- * Basically what happens when we try to kill
- * a thread when it is Active
- */
- Zombie,
- /**
- * The ThreadPoolImpl considers this thread to
- * be gone. The thread just needs to get out of
- * the way ASAP.
- *
- * Basically what happens when we try to kill
- * a thread when it is Idle
- */
- Dead
- } mState;
-
- void poke(ThreadState newState)
+ mListener->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
+
{
boost::unique_lock<boost::mutex> lock(mLock);
- mState = newState;
- mCond.notify_one();
+ while (mState == Idle)
+ {
+ mCond.wait(lock);
+ }
}
+}
- QueuePtr mQueue;
- boost::thread mThread;
- boost::condition_variable mCond;
- boost::mutex mLock;
- ThreadPoolPtr mPool;
-};
-
-void ThreadPool::grow(int numNewThreads)
+void WorkerThread::poke(ThreadState newState)
{
- for (int i = 0; i < numNewThreads; ++i)
- {
- boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue));
- mActiveThreads.push_back(newThread);
- }
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mState = newState;
+ mCond.notify_one();
}
-void ThreadPool::shrink(int threadsToKill)
+class ThreadQueueListener;
+
+class ThreadPoolPriv : public WorkerThreadListener
{
- for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
- i != mIdleThreads.end(); ++i)
+public:
+
+ ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, const PoolPtr& pool)
+ : mListener(listener), mQueue(queue), mPool(pool)
{
- mIdleThreads.erase(i);
- (*i)->poke(WorkerThread::Dead);
+ mQueue->setListener(new ThreadQueueListener(boost::shared_ptr<ThreadPoolPriv> (this), mPool));
+ }
- if (--threadsToKill == 0)
+ void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
+ {
+ int activeSize;
+ int idleSize;
+ int zombieSize;
{
- return;
+ boost::unique_lock<boost::mutex> lock(mLock);
+ std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
+ std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
+
+ if (iter != mActiveThreads.end())
+ {
+ mIdleThreads.push_back(*iter);
+ mActiveThreads.erase(iter);
+ }
+ activeSize = mActiveThreads.size();
+ idleSize = mIdleThreads.size();
+ zombieSize = mZombieThreads.size();
}
+ mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
-
- // If we've made it here, then it means that there weren't enough idle
- // threads to kill. We'll need to zombify some active threads then.
- for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
- i != mActiveThreads.end(); ++i)
+
+ void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
{
- //Active threads, on the other hand, need to at least temporarily be
- //pushed into the zombie container.
- mZombieThreads.push_back(*i);
- mActiveThreads.erase(i);
- (*i)->poke(WorkerThread::Zombie);
-
- if (--threadsToKill == 0)
+ int activeSize;
+ int idleSize;
+ int zombieSize;
{
- return;
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
+ activeSize = mActiveThreads.size();
+ idleSize = mIdleThreads.size();
+ zombieSize = mZombieThreads.size();
}
+ mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
-}
-
-void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
-{
- std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
- std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
-
- if (iter != mActiveThreads.end())
+
+ void grow(int numNewThreads)
{
- mIdleThreads.push_back(*iter);
- mActiveThreads.erase(iter);
+ int activeSize;
+ int idleSize;
+ int zombieSize;
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ for (int i = 0; i < numNewThreads; ++i)
+ {
+ boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue, boost::shared_ptr<ThreadPoolPriv>(this)));
+ mActiveThreads.push_back(newThread);
+ }
+ activeSize = mActiveThreads.size();
+ idleSize = mIdleThreads.size();
+ zombieSize = mZombieThreads.size();
+ }
+ mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
- mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
-
-void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
-{
- mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
- mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
-
-ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
- : mQueue(queue), mListener(listener)
-{
- queue->setListener(this);
-}
-
-void ThreadPool::setSize(int size)
-{
- int currentSize = mActiveThreads.size() + mIdleThreads.size() + mZombieThreads.size();
- if (size < currentSize)
+
+ void shrink(int threadsToKill)
{
- shrink(currentSize - size);
+ boost::unique_lock<boost::mutex> lock(mLock);
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+ i != mIdleThreads.end(); ++i)
+ {
+ mIdleThreads.erase(i);
+ (*i)->poke(Dead);
+
+ if (--threadsToKill == 0)
+ {
+ return;
+ }
+ }
+
+ // If we've made it here, then it means that there weren't enough idle
+ // threads to kill. We'll need to zombify some active threads then.
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
+ i != mActiveThreads.end(); ++i)
+ {
+ //Active threads, on the other hand, need to at least temporarily be
+ //pushed into the zombie container.
+ mZombieThreads.push_back(*i);
+ mActiveThreads.erase(i);
+ (*i)->poke(Zombie);
+
+ if (--threadsToKill == 0)
+ {
+ return;
+ }
+ }
}
- else
+
+ void resize(int numThreads)
{
- grow(size - currentSize);
+ int activeSize;
+ int idleSize;
+ int zombieSize;
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ //We don't count zombie threads as being "live" when potentially resizing
+ int currentSize = mActiveThreads.size() + mIdleThreads.size();
+
+ if (currentSize == numThreads)
+ {
+ return;
+ }
+
+ if (currentSize < numThreads)
+ {
+ grow(numThreads - currentSize);
+ }
+ else
+ {
+ shrink(currentSize - numThreads);
+ }
+ activeSize = mActiveThreads.size();
+ idleSize = mIdleThreads.size();
+ zombieSize = mZombieThreads.size();
+ }
+ mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
- mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
-}
-QueuePtr ThreadPool::getQueue()
+ AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
+ AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+
+ std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
+ std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
+ std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+
+ boost::mutex mLock;
+};
+
+ThreadQueueListener::ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something, const PoolPtr& pool)
+ : mThreadPoolPriv(something), mPool(pool)
{
- return mQueue;
}
-//QueueListener overloads
-
-void ThreadPool::workAdded(bool wasEmpty)
+void ThreadQueueListener::workAdded(bool wasEmpty)
{
// XXX The interface for PoolListener says that the second parameter of this
// method should be the amount of new items added. Well, that's just a pain to
@@ -213,28 +244,48 @@ void ThreadPool::workAdded(bool wasEmpty)
// If we want to report the amount of new work, then I would strongly suggest
// changing the QueueListener's workAdded method to include the amount of new
// work added.
- mListener->queueWorkAdded(this, mQueue->getSize(), wasEmpty);
+ mThreadPoolPriv->mListener->queueWorkAdded(mPool, mThreadPoolPriv->mQueue->getSize(), wasEmpty);
// XXX This could potentially stand to be more sophisticated, but poking all the
// idle threads will get the job done too.
//
// A potential alternative would be to poke a number of idle threads equal to the
// new work count.
- for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
- i != mIdleThreads.end(); ++i)
+ for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mThreadPoolPriv->mIdleThreads.begin();
+ i != mThreadPoolPriv->mIdleThreads.end(); ++i)
{
- (*i)->poke(WorkerThread::Active);
+ (*i)->poke(Active);
}
}
-void ThreadPool::workResumable()
+
+void ThreadQueueListener::workResumable()
{
- /* This should never be called since we use a standard
- * work queue in the thread pool.
+ /* This should never be called since the ThreadPool does not
+ * use a SuspendableQueue
*/
assert(false);
}
-void ThreadPool::emptied()
+
+void ThreadQueueListener::emptied()
+{
+ mThreadPoolPriv->mListener->queueEmptied(mPool);
+}
+
+ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+ : mThreadPoolPriv(new ThreadPoolPriv(listener, queue, this))
+{
+}
+
+void ThreadPool::setSize(int size)
+{
+ if (size >= 0)
+ {
+ mThreadPoolPriv->resize(size);
+ }
+}
+
+QueuePtr ThreadPool::getQueue()
{
- mListener->queueEmptied(this);
+ return mThreadPoolPriv->mQueue;
}
}; // end namespace ThreadPool
commit 9aa63ed4e16296c5107543bc83ae7e4a02216be8
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu Mar 31 17:42:21 2011 -0500
Get Threadpool code compiling.
There are still tons of issues with it, but I'll address them
later.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 21a57d0..b728bab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -7,3 +7,4 @@ add_subdirectory(SmartProxy)
add_subdirectory(StateReplicator)
add_subdirectory(AmiCollector)
add_subdirectory(WorkQueue)
+add_subdirectory(ThreadPool)
diff --git a/ThreadPool/CMakeLists.txt b/ThreadPool/CMakeLists.txt
index b1a0931..5e888f0 100644
--- a/ThreadPool/CMakeLists.txt
+++ b/ThreadPool/CMakeLists.txt
@@ -7,4 +7,3 @@
#
add_subdirectory(src)
-add_subdirectory(test)
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index ebe3135..bb3980e 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -18,6 +18,7 @@
#include <boost/shared_ptr.hpp>
#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
namespace AsteriskSCF
{
@@ -29,24 +30,30 @@ class WorkerThread;
class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
{
public:
- ThreadPool();
- ThreadPool(int size);
+ ThreadPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener, const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
void setSize(::Ice::Int);
AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+ void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
+ void zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+
+ //QueueListener overloads
+ void workAdded(bool wasEmpty);
+ void workResumable();
+ void emptied();
private:
void grow(int numNewThreads);
void shrink(int threadsToKill);
- void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
- void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
- AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+ AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
};
+typedef IceUtil::Handle<ThreadPool> ThreadPoolPtr;
+
class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
{
public:
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index b64d705..1f05af8 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -14,8 +14,9 @@
* at the top of the source tree.
*/
+#include <boost/thread.hpp>
+
#include <AsteriskSCF/ThreadPool.h>
-#include <AsteriskSCF/WorkQueue.h>
namespace AsteriskSCF
{
@@ -29,7 +30,7 @@ class WorkerThread
{
public:
WorkerThread(const QueuePtr& workQueue)
- : mQueue(workQueue), mState(Active), mThread(boost::bind(&WorkerThread::active, this)) { }
+ : mState(Active), mQueue(workQueue), mThread(boost::bind(&WorkerThread::active, this)) { }
void active()
{
@@ -50,7 +51,7 @@ public:
// vector of zombie threads.
if (mState == Zombie)
{
- mPool->zombieThreadDead(this);
+ mPool->zombieThreadDead(boost::shared_ptr<WorkerThread>(this));
}
}
@@ -67,17 +68,10 @@ public:
// Otherwise, we'll set ourselves idle and wait
// for a poke
mState = Idle;
- mPool->activeThreadIdle(this);
+ mPool->activeThreadIdle(boost::shared_ptr<WorkerThread>(this));
mCond.wait(lock);
}
- void poke(ThreadState newState)
- {
- boost::unique_lock<boost::mutex> lock(mLock);
- mState = newState;
- mCond.notify_one();
- }
-
enum ThreadState
{
/**
@@ -108,10 +102,18 @@ public:
Dead
} mState;
+ void poke(ThreadState newState)
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mState = newState;
+ mCond.notify_one();
+ }
+
QueuePtr mQueue;
boost::thread mThread;
boost::condition_variable mCond;
boost::mutex mLock;
+ ThreadPoolPtr mPool;
};
void ThreadPool::grow(int numNewThreads)
@@ -140,7 +142,7 @@ void ThreadPool::shrink(int threadsToKill)
// If we've made it here, then it means that there weren't enough idle
// threads to kill. We'll need to zombify some active threads then.
for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
- i != mThreads.end(); ++i)
+ i != mActiveThreads.end(); ++i)
{
//Active threads, on the other hand, need to at least temporarily be
//pushed into the zombie container.
@@ -165,19 +167,19 @@ void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
mIdleThreads.push_back(*iter);
mActiveThreads.erase(iter);
}
- mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+ mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
}
void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
{
mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
- mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+ mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
}
-ThreadPool::ThreadPool(const ThreadPoolListenerPtr &listener, const QueuePtr &queue)
- : mQueue(queue), mListener(listener), mQueueListener(new ThreadPoolQueueListener(this))
+ThreadPool::ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+ : mQueue(queue), mListener(listener)
{
- queue->setListener(mQueue);
+ queue->setListener(this);
}
void ThreadPool::setSize(int size)
@@ -211,7 +213,7 @@ void ThreadPool::workAdded(bool wasEmpty)
// If we want to report the amount of new work, then I would strongly suggest
// changing the QueueListener's workAdded method to include the amount of new
// work added.
- mListener->queueWorkAdded(this, mQueue->workCount(), wasEmpty);
+ mListener->queueWorkAdded(this, mQueue->getSize(), wasEmpty);
// XXX This could potentially stand to be more sophisticated, but poking all the
// idle threads will get the job done too.
//
@@ -232,7 +234,7 @@ void ThreadPool::workResumable()
}
void ThreadPool::emptied()
{
- mListener->queueEmptied(this)
+ mListener->queueEmptied(this);
}
}; // end namespace ThreadPool
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list