[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu Apr 7 18:15:53 CDT 2011
branch "workqueue" has been updated
via 782ecd7fa52a89e4116c9cbb9c75675e5316bc33 (commit)
from a823445114cf5e0b548c7467e7b8e4a43a4a6e92 (commit)
Summary of changes:
ThreadPool/include/AsteriskSCF/ThreadPool.h | 8 +-
ThreadPool/src/ThreadPool.cpp | 80 ++++++++++++-----------
ThreadPool/test/TestThreadPool.cpp | 95 +++++++++++++++++++++++++--
3 files changed, 135 insertions(+), 48 deletions(-)
- Log -----------------------------------------------------------------
commit 782ecd7fa52a89e4116c9cbb9c75675e5316bc33
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu Apr 7 18:12:37 2011 -0500
At this point the tests are all passing, valgrind reports no errors, and there are no memory/thread leaks.
The highlights:
* Make sure to join and delete the thread objects when we're done with them.
* Convert many shared pointers to bare ones
The last functional thing I want to do before cleanup/documentation/beautification
is to try to fix the problem where we call the pool listener's statechanged() method
with the pool's lock held.
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
index 292c993..4911037 100644
--- a/ThreadPool/include/AsteriskSCF/ThreadPool.h
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -40,14 +40,14 @@ private:
class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
{
public:
- ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something,
- const AsteriskSCF::System::ThreadPool::V1::PoolPtr& pool);
+ ThreadQueueListener(ThreadPoolPriv *something,
+ AsteriskSCF::System::ThreadPool::V1::Pool *pool);
void workAdded(bool wasEmpty);
void workResumable();
void emptied();
private:
- boost::shared_ptr<ThreadPoolPriv> mThreadPoolPriv;
- AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+ ThreadPoolPriv *mThreadPoolPriv;
+ AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
};
class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
index 20ad0bd..6dcdfda 100644
--- a/ThreadPool/src/ThreadPool.cpp
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -87,16 +87,25 @@ void WorkerThread::poke(ThreadState newState)
mCond.notify_one();
}
+void WorkerThread::join()
+{
+ mThread.join();
+}
+
class ThreadQueueListener;
class ThreadPoolPriv : public WorkerThreadListener
{
public:
-
- ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, const PoolPtr& pool)
+ ThreadPoolPriv(const PoolListenerPtr& listener, const QueuePtr& queue, Pool *pool)
: mListener(listener), mQueue(queue), mPool(pool)
{
- mQueue->setListener(new ThreadQueueListener(boost::shared_ptr<ThreadPoolPriv> (this), mPool));
+ mQueue->setListener(new ThreadQueueListener(this, mPool));
+ }
+
+ ~ThreadPoolPriv()
+ {
+ resize(0);
}
void activeThreadIdle(WorkerThread *thread)
@@ -117,7 +126,6 @@ public:
activeSize = mActiveThreads.size();
idleSize = mIdleThreads.size();
zombieSize = mZombieThreads.size();
- std::cout << "Active Thread has become idle. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
//XXX I don't like calling listener operations without a lock held since it makes
//it much more difficult to call pool operations from the listener. The problem is
//that by NOT calling with the lock held, state changes can arrive out of order...believe
@@ -136,60 +144,57 @@ public:
std::vector<WorkerThread*>::iterator i = std::find(mZombieThreads.begin(), mZombieThreads.end(), thread);
if (i != mZombieThreads.end())
{
+ WorkerThread *doomed = *i;
mZombieThreads.erase(i);
- delete *i;
+ delete doomed;
}
activeSize = mActiveThreads.size();
idleSize = mIdleThreads.size();
zombieSize = mZombieThreads.size();
- std::cout << "Endeadened a Zombie thread. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
}
void grow(int numNewThreads)
{
- std::cout << "Growing...adding " << numNewThreads << " threads" << std::endl;
- for (int i = 0; i < numNewThreads; ++i)
- {
- WorkerThread *newThread(new WorkerThread(mQueue, this));
- mActiveThreads.push_back(newThread);
- }
+ for (int i = 0; i < numNewThreads; ++i)
+ {
+ WorkerThread *newThread(new WorkerThread(mQueue, this));
+ mActiveThreads.push_back(newThread);
+ }
}
void shrink(int threadsToKill)
{
- std::cout << "Shrinking...removing " << threadsToKill << " threads" << std::endl;
- for (std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
- i != mIdleThreads.end(); ++i)
+ while (threadsToKill > 0)
{
- mIdleThreads.erase(i);
- (*i)->poke(Dead);
- delete *i;
-
- if (--threadsToKill == 0)
+ std::vector<WorkerThread*>::iterator i = mIdleThreads.begin();
+ if (i == mIdleThreads.end())
{
- return;
+ //We're out of idle threads to kill.
+ break;
}
+ WorkerThread *doomed = *i;
+ mIdleThreads.erase(i);
+ doomed->poke(Dead);
+ doomed->join();
+ delete doomed;
+
+ --threadsToKill;
}
- // If we've made it here, then it means that there weren't enough idle
- // threads to kill. We'll need to zombify some active threads then.
-
- std::cout << "Not enough idle threads to kill. About to zombify " << threadsToKill << " threads" << std::endl;
- for (std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
- i != mActiveThreads.end(); ++i)
+ while (threadsToKill > 0)
{
- //Active threads, on the other hand, need to at least temporarily be
- //pushed into the zombie container.
- mZombieThreads.push_back(*i);
- (*i)->poke(Zombie);
- mActiveThreads.erase(i);
-
- if (--threadsToKill == 0)
+ // If we've made it here, then it means that there weren't enough idle
+ // threads to kill. We'll need to zombify some active threads then.
+ std::vector<WorkerThread*>::iterator i = mActiveThreads.begin();
+ if (i != mActiveThreads.end())
{
- return;
+ mZombieThreads.push_back(*i);
+ (*i)->poke(Zombie);
+ mActiveThreads.erase(i);
}
+ --threadsToKill;
}
}
@@ -219,14 +224,13 @@ public:
activeSize = mActiveThreads.size();
idleSize = mIdleThreads.size();
zombieSize = mZombieThreads.size();
- std::cout << "Finished resizing. Idle: " << idleSize << ". Active: " << activeSize << ". Zombie: " << zombieSize << std::endl;
mListener->stateChanged(mPool, activeSize, idleSize, zombieSize);
}
}
AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
- AsteriskSCF::System::ThreadPool::V1::PoolPtr mPool;
+ AsteriskSCF::System::ThreadPool::V1::Pool *mPool;
std::vector<WorkerThread*> mActiveThreads;
std::vector<WorkerThread*> mIdleThreads;
@@ -235,7 +239,7 @@ public:
boost::mutex mLock;
};
-ThreadQueueListener::ThreadQueueListener(const boost::shared_ptr<ThreadPoolPriv>& something, const PoolPtr& pool)
+ThreadQueueListener::ThreadQueueListener(ThreadPoolPriv *something, Pool *pool)
: mThreadPoolPriv(something), mPool(pool)
{
}
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
index 55ba29b..966515b 100644
--- a/ThreadPool/test/TestThreadPool.cpp
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -39,7 +39,6 @@ public:
mActive = active;
mIdle = idle;
mZombie = zombie;
- std::cout << "Got stateChanged message: idle: " << mIdle << ". active: " << mActive << ". zombie: " << mZombie << std::endl;
mCond.notify_one();
}
@@ -174,8 +173,6 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
QueuePtr queue(new WorkQueue());
PoolPtr pool(new ThreadPool(listener, queue));
- std::cout << "Initializing 3 threads for thread pool" << std::endl;
-
pool->setSize(3);
{
@@ -190,8 +187,6 @@ BOOST_AUTO_TEST_CASE(threadDestruction)
BOOST_CHECK(listener->mActive == 0);
BOOST_CHECK(listener->mZombie == 0);
- std::cout << "Shrinking thread pool to 2 threads" << std::endl;
-
pool->setSize(2);
{
@@ -429,7 +424,7 @@ BOOST_AUTO_TEST_CASE(zombies)
pool->setSize(2);
//Since these tasks halt until they are poked,
- //The two tasks should be evenly divided amongst
+ //The two tasks should be evenly divided among
//the threads.
BOOST_CHECK(listener->mActive == 2);
BOOST_CHECK(listener->mIdle == 0);
@@ -486,4 +481,92 @@ BOOST_AUTO_TEST_CASE(zombies)
BOOST_CHECK(listener->mZombie == 0);
}
+BOOST_AUTO_TEST_CASE(moreThreadDestruction)
+{
+ BOOST_TEST_MESSAGE("Running moreThreadDestruction test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ PoolPtr pool(new ThreadPool(listener, queue));
+ ComplexTaskPtr work1(new ComplexTask());
+ ComplexTaskPtr work2(new ComplexTask());
+ WorkSeq works;
+
+ works.push_back(work1);
+ works.push_back(work2);
+
+ queue->enqueueWorkSeq(works);
+
+ pool->setSize(4);
+
+ // All threads start as active, but 2 should become
+ // idle nearly immediately. We don't want to proceed
+ // until we know the two threads have become idle.
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mIdle < 2)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
+
+ BOOST_CHECK(listener->mActive == 2);
+ BOOST_CHECK(listener->mIdle == 2);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ pool->setSize(1);
+
+ //Previous state was 2 active and 2 idle threads.
+ //Removing 3 threads should kill the 2 idle threads
+ //and change one of the active threads to a zombie.
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mIdle > 0 || listener->mZombie == 0)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
+
+ BOOST_CHECK(listener->mActive == 1);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 1);
+
+ {
+ boost::unique_lock<boost::mutex> lock1(work1->mLock);
+ boost::unique_lock<boost::mutex> lock2(work2->mLock);
+ work1->mContinue = true;
+ work2->mContinue = true;
+ work1->mStall.notify_one();
+ work2->mStall.notify_one();
+ }
+
+ {
+ boost::unique_lock<boost::mutex> lock1(work1->mLock);
+ boost::unique_lock<boost::mutex> lock2(work2->mLock);
+ while (!work1->taskExecuted)
+ {
+ work1->mDone.wait(lock1);
+ }
+ while (!work2->taskExecuted)
+ {
+ work2->mDone.wait(lock2);
+ }
+ }
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+
+ {
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (listener->mZombie > 0 || listener->mActive > 0)
+ {
+ listener->mCond.wait(lock);
+ }
+ }
+
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
BOOST_AUTO_TEST_SUITE_END()
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list