[asterisk-scf-commits] asterisk-scf/release/ice-util-cpp.git branch "master" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Thu May 5 12:29:10 CDT 2011
branch "master" has been updated
via 196bcc8df23a43034975385357819c14557c9530 (commit)
via b0db008f0afff643862fc27ef993e17bd2ad1321 (commit)
from 0267addab4eaaae77d941de5263309b6f29582b2 (commit)
Summary of changes:
CMakeLists.txt | 2 +
ThreadPool/CMakeLists.txt | 10 +
ThreadPool/include/AsteriskSCF/ThreadPool.h | 63 +++
ThreadPool/include/AsteriskSCF/WorkerThread.h | 90 ++++
ThreadPool/src/CMakeLists.txt | 29 +
ThreadPool/src/ThreadPool.cpp | 457 ++++++++++++++++
ThreadPool/src/WorkerThread.cpp | 120 +++++
ThreadPool/test/CMakeLists.txt | 37 ++
ThreadPool/test/ProofOfConcept.cpp | 174 +++++++
ThreadPool/test/TestThreadPool.cpp | 544 ++++++++++++++++++++
{AmiCollector => ThreadPool}/test/test.cpp | 4 +-
WorkQueue/CMakeLists.txt | 10 +
.../include/AsteriskSCF/DefaultQueueListener.h | 58 ++
.../include/AsteriskSCF/SuspendableWorkQueue.h | 49 ++
WorkQueue/include/AsteriskSCF/WorkQueue.h | 48 ++
WorkQueue/src/CMakeLists.txt | 31 ++
WorkQueue/src/DefaultQueueListener.cpp | 107 ++++
WorkQueue/src/SuspendableWorkQueue.cpp | 370 +++++++++++++
WorkQueue/src/WorkQueue.cpp | 169 ++++++
WorkQueue/test/CMakeLists.txt | 32 ++
WorkQueue/test/TestSuspendableWorkQueue.cpp | 489 ++++++++++++++++++
WorkQueue/test/TestWorkQueue.cpp | 335 ++++++++++++
{AmiCollector => WorkQueue}/test/test.cpp | 4 +-
.../test/test.cpp => WorkQueue/test/test2.cpp | 4 +-
24 files changed, 3230 insertions(+), 6 deletions(-)
create mode 100644 ThreadPool/CMakeLists.txt
create mode 100644 ThreadPool/include/AsteriskSCF/ThreadPool.h
create mode 100644 ThreadPool/include/AsteriskSCF/WorkerThread.h
create mode 100644 ThreadPool/src/CMakeLists.txt
create mode 100644 ThreadPool/src/ThreadPool.cpp
create mode 100644 ThreadPool/src/WorkerThread.cpp
create mode 100644 ThreadPool/test/CMakeLists.txt
create mode 100644 ThreadPool/test/ProofOfConcept.cpp
create mode 100644 ThreadPool/test/TestThreadPool.cpp
copy {AmiCollector => ThreadPool}/test/test.cpp (87%)
create mode 100644 WorkQueue/CMakeLists.txt
create mode 100644 WorkQueue/include/AsteriskSCF/DefaultQueueListener.h
create mode 100644 WorkQueue/include/AsteriskSCF/SuspendableWorkQueue.h
create mode 100644 WorkQueue/include/AsteriskSCF/WorkQueue.h
create mode 100644 WorkQueue/src/CMakeLists.txt
create mode 100644 WorkQueue/src/DefaultQueueListener.cpp
create mode 100644 WorkQueue/src/SuspendableWorkQueue.cpp
create mode 100644 WorkQueue/src/WorkQueue.cpp
create mode 100644 WorkQueue/test/CMakeLists.txt
create mode 100644 WorkQueue/test/TestSuspendableWorkQueue.cpp
create mode 100644 WorkQueue/test/TestWorkQueue.cpp
copy {AmiCollector => WorkQueue}/test/test.cpp (87%)
copy AmiCollector/test/test.cpp => WorkQueue/test/test2.cpp (86%)
- Log -----------------------------------------------------------------
commit 196bcc8df23a43034975385357819c14557c9530
Merge: 0267add b0db008
Author: Mark Michelson <mmichelson at digium.com>
Date: Thu May 5 12:28:44 2011 -0500
Merge branch 'workqueue'
commit b0db008f0afff643862fc27ef993e17bd2ad1321
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed Mar 16 13:25:03 2011 -0500
Add initial WorkQueue implementation.
This is missing two key aspects:
1) Locks
2) Callouts to the listener.
These shall be added in short order.
Add listener operations.
Fix up a couple of potential hazards as well, and write the
work cancelation method.
Add locking to protect queue accesses.
Add another potential spot to notify of the queue's emptiness.
I also removed the "XXX make asynchronous" comments because it's not
really necessary. Since all objects are local, there's not a huge
penalty incurred for calling out to the listener synchronously. Also,
since we don't have a proxy to the listener but a handle, it would
not be possible to use AMI AFAIK.
Add tests that exercise the WorkQueue implementation.
I had to do some tweaking to some CMakeLists.txt files to get
things a bit more organized. I also made some tweaks to
WorkQueue.cpp to make tests pass as planned.
Some discussion in \#asterisk-scf-dev has prompted some
new behavior to be added, so that's what I'll do next.
Test return value of executeWork() calls.
Throw exception when trying to cancel nonexistent work.
Use std::find when canceling work. Tests still pass!
Throw WorkExists exception when attempting to enqueue an item already in the queue.
Also added new test cases. All tests pass.
Remove redundant check for emptiness when canceling work.
The std::find operation is really all that's needed. It doesn't
require a special case for an empty queue.
Add initial implementation of SuspendableWorkQueue.
Clarify some language and fix some bugs.
Add some test code.
This has all the same tests from WorkQueue and defines some
further tasks that can be used to test task suspension. Currently
test code compiles but tests do not pass. I will have to investigate
this later.
Get tests to pass by fixing some errors in SuspendableWorkQueue.cpp
First, order of operations was causing workCount() to always return
1 or 0. Second, not initializing the state of the queue caused
operations to behave unexpectedly.
Add a test that tries a multi-step work item.
Add a test to see how the queue handles a potential race condition.
Fix synchronization in ComplexTask test.
My use of conditionals didn't quite do the trick, resulting
in occasional test failures. I made these adjustments and ran
the test about 30 times and saw no failures.
Reorder some code for clarity and add a new assertion.
Add some checks to be sure the listener's workResumable() is called when we expect.
Make the flow for the ComplexTask test less dumb.
Add some comments to the suspendable work queue test.
Add data to WorkExists and SuspendableWorkExists exceptions.
Check that exceptions have correct data in them.
Make sure to place reference operator next to type instead of variable name.
Address Brent's comments on CR-ASTSCF-69 for WorkQueue.
* Changed instances of "Impl" to "Priv"
* Changed 'q' member to "mQueue"
* Use scopes instead of explicit unlock statements
* Simplify flow of several method calls.
* Account for potential mListener == 0 cases
* Lock when setting mListener
* Despite what I said on the review, use a shared_mutex and shared_locks
where applicable.
Tests all still pass.
Make similar changes to SuspendableWorkQueue as I did to WorkQueue.
In addition, there are findDuplicates() methods added for convenience.
I do believe I'll be adding these to the regular WorkQueue as well.
Add a findDuplicates method to WorkQueue
Fix problems pointed out by Kevin during code review.
* Fixed potential race conditions that could result in attempting to
send notices to listeners that had been erased.
* Removed an unnecessary typedef in SuspendableWorkQueue.cpp
* Added a comment to a method of SuspendableWorkQueuePriv indicating
it expects a lock to be held as a precondition.
Adding some initial code for ThreadPool implementation.
It likely doesn't even compile. I'm just pushing it so that I can
get to it when I work on it later over the weekend.
Address review feedback from Ken and Joshua.
* Change name of workCount() to getSize() for queues.
* Remove WorkExists and SuspendableWorkExists exceptions.
* Decapitate a goat
* Remove unnecessary cout statements from tests.
Tests still pass.
Add some documentation for return value of executeWork() methods.
Get Threadpool code compiling.
There are still tons of issues with it, but I'll address them
later.
A compiling and potentially correct version of the thread pool.
Tests are planned, now to write the suckers.
Addressing build issue on Windows.
Add fix for Windows build issue.
Add initial test code.
Everything is actually compiling and linking properly. I am happy.
The only test written so far passed, too. COOL.
Fix up some issues I saw while testing the ThreadPool
* Don't use shared pointers if we don't have to. There were a few
issues fixed here. First, this gets rid of circular reference
issues that I had feared. Second, make sure not to write code
that results in independent shared_ptrs to the same object. The
object will get deleted at an unexpected time.
There may actually be another place where bare pointers will
work well too. We shall see.
* I referenced the wrong vector in zombieThreadDead
* Now calling stateChanged() functions with the ThreadPoolPriv's
lock held. This corrected an issue where state changes were
sometimes sent out of order. I really don't like doing this
and would like to formulate a fix.
* Added some cout debug messages to aid in testing. I may remove
these or I may add logger integration if it turns out these
sorts of messages may be helpful.
Add more tests to the ThreadPool testsuite.
The last test currently fails due invalid use of an iterator
in the ThreadPoolPriv's shrink() method. Hint: Don't use
vector::erase inside a for loop and expect the iterator to
still be usable.
At this point the tests are all passing, valgrind reports no errors, and there are no memory/thread leaks.
The highlights:
* Make sure to join and delete the thread objects when we're done with them.
* Convert many shared pointers to bare ones
The last functional thing I want to do before cleanup/documentation/beautification
is to try to fix the problem where we call the pool listener's statechanged() method
with the pool's lock held.
Hm, forgot to add this file...
Change a parameter to a less silly name.
Adjust based on change to WorkQueueIf.ice to report number of new items added to a queue.
Test code has been altered to be sure this works.
Thread pool code is now complete, at least with regards to code correctness.
* In WorkQueue code, I implemented a "DefaultQueueListener" class. This class
is a simple implementation of a WorkQueueListener. The class creates a thread
that will execute tasks when the WorkQueue has tasks to execute and that will
go idle when there are no tasks to execute. Since I imagine that this type of
construct may be used often, it is included in the WorkQueue source so that
it may be used as desired.
* The ThreadPool now has all its operations queued onto a "Control" queue. The
main reason is so PoolListener methods may be called without a lock being
held, and the execution order of the PoolListener operations can be guaranteed.
Though the code is correct and tests pass, I want to make at least one more run
through the code for cleanup/documentation before opening a code review.
Reduce size of the thread pool test by introducing a handy macro to wait for threads to reach a specific state.
Add helper function to poke ComplexTasks.
Explicitly destroy the QueueListener in the WorkQueuePriv's destructor.
Every now and then a ThreadPool test would fail due to an apparent race
condition. The problem was that the WorkQueuePriv's shared mutex was
being destroyed while its listener thread was still (indirectly) using it.
By explicitly destroying the listener in the destructor, we ensure it
gets destroyed before the shared_mutex does.
I reran the ThreadPool test about 30 times and saw no crashes. I easily
would have seen 4 or 5 in that period prior to this change.
Add another helper function for waiting for task completion.
Add helper method for sending stateChanged notifications.
Add helper methods for killing and zombifying threads.
Several changes for the sake of cleanup
* Use pImpl idiom on WorkerThreads
* Add comments all over explaining things
* Use some typedefs to make things both easier to read
and easier to change if the type of container should
be changed.
Add an explanation to DefaultQueueListener.
Explicitly destroy the SuspendableWorkQueue's listener in its destructor.
Separate WorkerThread code into its own source file.
I think this helps readability a LOT.
Get rid of unused parameter warnings in TestThreadPool.
Convert worker threads to be smart pointers instead of bare pointers.
Get rid of some unused parameters.
Change "poke" to "setState"
Do some re-arranging to privatize things that could be made more private.
This results in the ThreadPool class being declared entirely in ThreadPool.cpp,
thus alleviating the need for a ThreadPoolPriv class.
Don't confuse matters by returning false when an Ice handle is the return type.
Privatise the ThreadQueueListener more.
Remove an unnecessary call since boost::thread has an == overload.
Fix problems that could cause potential hanging.
During testing, about 1 time in 20, the ThreadPool test
would hang. After looking into it further, it was apparent that
the DefaultQueueListener was sitting idle despite the fact that
work had been queued.
The problem is that when either the DefaultQueueListener or
a ThreadPool's WorkerThread goes idle, it would set its own state
to Idle. This could override a different state set externally. The
only way to get out of this state was to queue new work, but in
the tests, all work had already been queued.
Now, there is a special boolean used expressly for thread
awakening. The states of DefaultQueueListener and WorkerThread
have been altered to remove states that are no longer used as
a result.
Make workAdded and idle thread awakening a queued operation.
Adjusted tests as well. AND THEY PASS
Make the empty notification a queued operation.
This isn't done because of resource contention. This is done in order
to preserve order of operations when reporting to a pool listener.
Fix up some documentation.
Resolve potential optimiser infinite loops through trickery and deceit.
Don't ask why I used the commonwealth spellings in my comments. There
is no satisfactory reason.
s/WAIT_FOR_THREADS/WAIT_WHILE/g
Add asterisks-scf-api as a link target for WorkQueue and ThreadPool.
Fix bug where awakened idle threads were not moved into the active container.
Add new test case that exercises thread reactivation.
Adjust DefaultQueueListener test to not use busy loops.
Change code to accommodate new exception guidelines for work cancellation.
Tests have been updated to properly expect and test exception times.
Remove ugliness from the SuspendableWorkQueueTest.
Specifically, get rid of global locks and condition variables since
they really are unnecessary. I also removed all calls to lock.unlock()
in favor of using scoped locks.
Adjust cmake files to deal with the single-build-dir merge.
Add a fix where we do not initialize a boolean.
I have no idea why/how tests were passing before this change. Perhaps
GCC initializes bools to false?
Add initial proof of concept thread pool test before I forget about it.
Comment out ProofOfConcept test since it is incomplete.
Adding what is necessary to have a successful build on Windows.
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2beaa3d..3fe896e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -9,3 +9,5 @@ add_subdirectory(SmartProxy)
add_subdirectory(StateReplicator)
add_subdirectory(AmiCollector)
add_subdirectory(TestFixture)
+add_subdirectory(WorkQueue)
+add_subdirectory(ThreadPool)
diff --git a/ThreadPool/CMakeLists.txt b/ThreadPool/CMakeLists.txt
new file mode 100644
index 0000000..b1a0931
--- /dev/null
+++ b/ThreadPool/CMakeLists.txt
@@ -0,0 +1,10 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+add_subdirectory(src)
+add_subdirectory(test)
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
new file mode 100644
index 0000000..47e9527
--- /dev/null
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -0,0 +1,63 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+/**
+ * Forward declaration of our implementation
+ * of an AsteriskSCF::System::ThreadPool::V1::Pool.
+ *
+ * For more information on its methods, see
+ * AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
+ */
+class ThreadPool;
+
+class ThreadQueueListener;
+typedef IceUtil::Handle<ThreadQueueListener> ThreadQueueListenerPtr;
+
+class ThreadQueueListenerFactory
+{
+public:
+ ThreadQueueListenerFactory();
+ ThreadQueueListenerPtr createThreadQueueListener(ThreadPool *pool);
+};
+
+/**
+ * For more information on these methods, see
+ * AsteriskSCF/System/ThreadPool/ThreadPoolIf.ice
+ */
+class ASTERISK_SCF_ICEBOX_EXPORT ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
+{
+public:
+ ThreadPoolFactory();
+ AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
+ const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+private:
+};
+
+typedef IceUtil::Handle<ThreadPoolFactory> ThreadPoolFactoryPtr;
+
+}; // end namespace ThreadPool
+}; // end namespace AsteriskSCF
diff --git a/ThreadPool/include/AsteriskSCF/WorkerThread.h b/ThreadPool/include/AsteriskSCF/WorkerThread.h
new file mode 100644
index 0000000..4597a11
--- /dev/null
+++ b/ThreadPool/include/AsteriskSCF/WorkerThread.h
@@ -0,0 +1,90 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+enum ThreadState
+{
+ /**
+ * Thread is not scheduled for deletion. The
+ * thread may be actively executing tasks, or it
+ * may be idle, waiting to be woken up.
+ */
+ Alive,
+ /**
+ * Marked for deletion. May still be executing
+ * code but next time it checks its state, it
+ * will die.
+ */
+ Zombie,
+ /**
+ * The ThreadPool considers this thread to
+ * be gone. The thread just needs to get out of
+ * the way ASAP.
+ */
+ Dead
+};
+
+class WorkerThreadListener;
+
+class WorkerThreadPriv;
+
+class WorkerThread : public IceUtil::Shared
+{
+public:
+ WorkerThread(const AsteriskSCF::System::WorkQueue::V1::QueuePtr& workQueue, WorkerThreadListener *listener);
+ ~WorkerThread();
+ /**
+ * Tells a worker thread to change states
+ *
+ * @param newState The state to change to
+ */
+ void setState(ThreadState newState);
+
+ bool operator==(const WorkerThread& rhs);
+private:
+ boost::shared_ptr<WorkerThreadPriv> mPriv;
+};
+
+typedef IceUtil::Handle<WorkerThread> WorkerThreadPtr;
+
+/**
+ * An listener for WorkerThread state changes
+ */
+class WorkerThreadListener
+{
+public:
+ /**
+ * An active thread has become idle
+ */
+ virtual void activeThreadIdle(WorkerThreadPtr thread) = 0;
+ /**
+ * A zombie thread has died
+ */
+ virtual void zombieThreadDead(WorkerThreadPtr thread) = 0;
+};
+
+}; // end namespace ThreadPool
+}; // end namespace AsteriskSCF
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
new file mode 100644
index 0000000..584d4d4
--- /dev/null
+++ b/ThreadPool/src/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_slice_include_directories(${API_SLICE_DIR})
+
+asterisk_scf_component_init(ThreadPool CXX)
+
+include_directories(../include)
+include_directories(../../WorkQueue/include)
+include_directories(${API_INCLUDE_DIR})
+
+asterisk_scf_component_add_file(ThreadPool
+ ../include/AsteriskSCF/ThreadPool.h)
+asterisk_scf_component_add_file(ThreadPool
+ ../include/AsteriskSCF/WorkerThread.h)
+asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
+asterisk_scf_component_add_file(ThreadPool WorkerThread.cpp)
+asterisk_scf_component_add_boost_libraries(ThreadPool thread date_time)
+
+asterisk_scf_component_build_library(ThreadPool)
+target_link_libraries(ThreadPool WorkQueue)
+target_link_libraries(ThreadPool asterisk-scf-api)
+
+asterisk_scf_headers_install(../include/)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
new file mode 100644
index 0000000..775154c
--- /dev/null
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -0,0 +1,457 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/thread.hpp>
+
+#include <AsteriskSCF/ThreadPool.h>
+#include <AsteriskSCF/WorkerThread.h>
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/DefaultQueueListener.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class ThreadQueueListener;
+
+typedef std::vector<WorkerThreadPtr> ThreadContainer;
+typedef ThreadContainer::iterator ThreadIterator;
+
+class ThreadPool : public Pool, public WorkerThreadListener
+{
+public:
+
+ ThreadPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+ : mControlQueue(new AsteriskSCF::WorkQueue::WorkQueue), mListener(listener), mQueue(queue), mShuttingDown(false)
+ {
+ mControlQueue->setListener(new AsteriskSCF::WorkQueue::DefaultQueueListener(mControlQueue));
+ ThreadQueueListenerPtr tqListener;
+ ThreadQueueListenerFactory factory;
+ tqListener = factory.createThreadQueueListener(this);
+ mQueue->setListener(tqListener);
+ }
+
+ ~ThreadPool()
+ {
+ //The tricky thing when destroying the thread pool is that
+ //we have to be sure that no queued operations from the worker
+ //threads interfere with destruction.
+ //
+ //First, we make it known we are shutting down, and kill off
+ //the control queue so that no more operations that might
+ //move WorkerThreads from their current containers may run.
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ mShuttingDown = true;
+ mControlQueue = 0;
+ }
+
+ //Now WorkerThreads are stuck in their current containers. All
+ //we need to do now is to kill them off.
+ killThreads(mIdleThreads, mIdleThreads.begin(), mIdleThreads.end());
+ killThreads(mActiveThreads, mActiveThreads.begin(), mActiveThreads.end());
+ killThreads(mZombieThreads, mZombieThreads.begin(), mZombieThreads.end());
+ }
+
+ /**
+ * Override of AsteriskSCF::System::ThreadPool::V1::Pool::setSize
+ */
+ void setSize(int size)
+ {
+ if (size >= 0)
+ {
+ resize(size);
+ }
+ }
+
+ /**
+ * Override of AsteriskSCF::System::ThreadPool::V1::Pool::getQueue
+ */
+ QueuePtr getQueue()
+ {
+ return mQueue;
+ }
+
+ void sendStateChanged()
+ {
+ int activeSize = mActiveThreads.size();
+ int idleSize = mIdleThreads.size();
+ int zombieSize = mZombieThreads.size();
+ mListener->stateChanged(this, activeSize, idleSize, zombieSize);
+ }
+
+ void killThreads(ThreadContainer& container,
+ ThreadIterator first, ThreadIterator last)
+ {
+ container.erase(first, last);
+ }
+
+ void zombifyActiveThreads(ThreadIterator first, ThreadIterator last)
+ {
+ for (ThreadIterator iter = first; iter != last; ++iter)
+ {
+ mZombieThreads.push_back(*iter);
+ (*iter)->setState(Zombie);
+ }
+ mActiveThreads.erase(first, last);
+ }
+
+ /**
+ * Queued task that moves an active thread to the idle thread container.
+ *
+ * This method is queued on the control queue via the
+ * activeThreadIdle() method and is always enqueued by
+ * worker threads.
+ */
+ class ActiveThreadIdle : public Work
+ {
+ public:
+ ActiveThreadIdle(WorkerThreadPtr thread, ThreadPool *priv)
+ : mPriv(priv), mWorkerThread(thread) { }
+
+ void execute()
+ {
+ ThreadIterator iter =
+ std::find(mPriv->mActiveThreads.begin(),
+ mPriv->mActiveThreads.end(), mWorkerThread);
+
+ if (iter != mPriv->mActiveThreads.end())
+ {
+ mPriv->mIdleThreads.push_back(*iter);
+ mPriv->mActiveThreads.erase(iter);
+ }
+ mPriv->sendStateChanged();
+ }
+ private:
+ ThreadPool *mPriv;
+ WorkerThreadPtr mWorkerThread;
+ };
+
+ typedef IceUtil::Handle<ActiveThreadIdle> ActiveThreadIdlePtr;
+
+ void activeThreadIdle(WorkerThreadPtr thread)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ ActiveThreadIdlePtr task(new ActiveThreadIdle(thread, this));
+ mControlQueue->enqueueWork(task);
+ }
+ }
+
+ /**
+ * Queued task that deletes and removes a zombie thread from the zombie thread container.
+ *
+ * This method is queued on the control queue via the
+ * zombieThreadDead() method and is always enqueued
+ * by worker threads.
+ */
+ class ZombieThreadDead : public Work
+ {
+ public:
+ ZombieThreadDead(WorkerThreadPtr thread, ThreadPool *priv)
+ : mPriv(priv), mWorkerThread(thread) { }
+
+ void execute()
+ {
+ ThreadIterator i = std::find(mPriv->mZombieThreads.begin(),
+ mPriv->mZombieThreads.end(), mWorkerThread);
+ if (i != mPriv->mZombieThreads.end())
+ {
+ mPriv->killThreads(mPriv->mZombieThreads, i, i + 1);
+ }
+ mPriv->sendStateChanged();
+ }
+ private:
+ ThreadPool *mPriv;
+ WorkerThreadPtr mWorkerThread;
+ };
+
+ typedef IceUtil::Handle<ZombieThreadDead> ZombieThreadDeadPtr;
+
+ void zombieThreadDead(WorkerThreadPtr thread)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ ZombieThreadDeadPtr task(new ZombieThreadDead(thread, this));
+ mControlQueue->enqueueWork(task);
+ }
+ }
+
+ /**
+ * Queued task that causes the thread pool to gain or lose threads
+ *
+ * This is enqueued as a result of calling the resize() method,
+ * which is invoked by ThreadPool::setSize(). This may be called
+ * from any thread.
+ */
+ class Resize : public Work
+ {
+ public:
+ Resize(int numThreads, ThreadPool *priv)
+ : mNumThreads(numThreads), mPriv(priv) { }
+ void execute()
+ {
+ //We don't count zombie threads as being "live" when potentially resizing
+ size_t currentSize = mPriv->mActiveThreads.size() + mPriv->mIdleThreads.size();
+
+ if (currentSize == mNumThreads)
+ {
+ return;
+ }
+
+ if (currentSize < mNumThreads)
+ {
+ grow(mNumThreads - currentSize);
+ }
+ else
+ {
+ shrink(currentSize - mNumThreads);
+ }
+ mPriv->sendStateChanged();
+ }
+ private:
+ /**
+ * Add more threads to the pool.
+ *
+ * All new threads start as Active, so our method here
+ * is simply to add new threads to the end of the
+ * active thread container.
+ */
+ void grow(size_t numNewThreads)
+ {
+ for (size_t i = 0; i < numNewThreads; ++i)
+ {
+ WorkerThreadPtr newThread(new WorkerThread(mPriv->mQueue, mPriv));
+ mPriv->mActiveThreads.push_back(newThread);
+ }
+ }
+
+ /**
+ * Remove threads from the pool.
+ *
+ * The methodology here is pretty simple. First, we eliminate
+ * idle threads since they're not doing anything and can die
+ * easily. If after killing all idle threads we still need
+ * to remove more threads, then the next thing to do is to
+ * zombify some active threads.
+ */
+ void shrink(size_t threadsToKill)
+ {
+ size_t idleThreadsToKill = threadsToKill <= mPriv->mIdleThreads.size() ?
+ threadsToKill : mPriv->mIdleThreads.size();
+ size_t activeThreadsToZombify = threadsToKill - idleThreadsToKill;
+
+ mPriv->killThreads(mPriv->mIdleThreads, mPriv->mIdleThreads.begin(),
+ mPriv->mIdleThreads.begin() + idleThreadsToKill);
+
+ //This isn't really necessary since Resize::execute() checked the
+ //size prior to calling shrink. I'm just paranoid :)
+ assert(activeThreadsToZombify <= mPriv->mActiveThreads.size());
+
+ mPriv->zombifyActiveThreads(mPriv->mActiveThreads.begin(),
+ mPriv->mActiveThreads.begin() + activeThreadsToZombify);
+ }
+
+ const size_t mNumThreads;
+ ThreadPool *mPriv;
+ };
+
+ typedef IceUtil::Handle<Resize> ResizePtr;
+
+ void resize(int numThreads)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ ResizePtr task(new Resize(numThreads, this));
+ mControlQueue->enqueueWork(task);
+ }
+ }
+
+ /**
+ * Queued task called when we are notified work has been added to the queue.
+ *
+ * When executed, we notify our listener that work has been added and we
+ * awaken idle threads.
+ *
+ * XXX The current method awakens all idle threads. A potential alternative
+ * would be to awaken a number of idle threads equal to the number of
+ * tasks added to the queue.
+ */
+ class WorkAdded : public Work
+ {
+ public:
+ WorkAdded(int numNewWork, bool wasEmpty, ThreadPool *pool)
+ : mNewWork(numNewWork), mWasEmpty(wasEmpty), mPool(pool) { }
+
+ void execute()
+ {
+ mPool->mListener->queueWorkAdded(mPool, mNewWork, mWasEmpty);
+ for (ThreadIterator i = mPool->mIdleThreads.begin();
+ i != mPool->mIdleThreads.end(); ++i)
+ {
+ mPool->mActiveThreads.push_back(*i);
+ (*i)->setState(Alive);
+ }
+ mPool->mIdleThreads.erase(mPool->mIdleThreads.begin(), mPool->mIdleThreads.end());
+ }
+ private:
+ const int mNewWork;
+ const bool mWasEmpty;
+ ThreadPool *mPool;
+ };
+
+ typedef IceUtil::Handle<WorkAdded> WorkAddedPtr;
+
+ void handleWorkAdded(int numNewWork, bool wasEmpty)
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ WorkAddedPtr task(new WorkAdded(numNewWork, wasEmpty, this));
+ mControlQueue->enqueueWork(task);
+ }
+ }
+
+ /**
+ * Queued task called when we are notified our queue has been emptied.
+ *
+ * All that happens here is we relay the empty notice to our pool
+ * listener. The reason this task is queued is not due to resource
+ * contention. Instead, the task is queued in order to preserve proper
+ * order of outbound notifications.
+ */
+ class Emptied : public Work
+ {
+ public:
+ Emptied(ThreadPool *pool) : mPool(pool) { }
+ void execute()
+ {
+ mPool->mListener->queueEmptied(mPool);
+ }
+ private:
+ ThreadPool *mPool;
+ };
+
+ typedef IceUtil::Handle<Emptied> EmptiedPtr;
+
+ void handleEmptied()
+ {
+ boost::lock_guard<boost::mutex> lock(mQueueLock);
+ if (!mShuttingDown)
+ {
+ EmptiedPtr task(new Emptied(this));
+ mControlQueue->enqueueWork(task);
+ }
+ }
+
+ /**
+ * The control queue is where operations to modify
+ * the thread pool are queued. This allows for thread
+ * pool listeners to guarantee that the notifications
+ * they receive arrive in the order that the thread
+ * pool processed the operations that caused those
+ * notifications to be sent.
+ *
+ * This also has the benefit of eliminating locking
+ * from the majority of the thread pool.
+ */
+ QueuePtr mControlQueue;
+
+ PoolListenerPtr mListener;
+ QueuePtr mQueue;
+
+ ThreadContainer mActiveThreads;
+ ThreadContainer mIdleThreads;
+ ThreadContainer mZombieThreads;
+
+ /**
+ * These two members are required solely to make
+ * destruction of the thread pool safe. Since destruction
+ * of the thread pool is the only operation that does not
+ * go through the control queue, we have to be careful
+ * to make sure that no one attempts to make use of the
+ * control queue once we are in the process of
+ * destroying the thread pool.
+ */
+ bool mShuttingDown;
+ boost::mutex mQueueLock;
+};
+
+/**
+ * An implementation of a QueueListener used by a ThreadPool.
+ *
+ * The main job of this listener is to redirect the events to the
+ * Pool's PoolListener.
+ *
+ * For mor information on the QueueListener's methods, see
+ * AsteriskSCF/System/WorkQueue/WorkQueueIf.ice
+ */
+class ThreadQueueListener : public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+ ThreadQueueListener(ThreadPool *pool)
+ : mThreadPool(pool) { }
+ /**
+ * Results in PoolListener::queueWorkAdded being called
+ */
+ void workAdded(int numNewWork, bool wasEmpty)
+ {
+ mThreadPool->handleWorkAdded(numNewWork, wasEmpty);
+ }
+
+ /**
+ * Should never be called since a ThreadPool does not
+ * use a SuspendableQueue
+ */
+ void workResumable()
+ {
+ assert(false);
+ }
+
+ /**
+ * Results in PoolListener::queueEmptied being called
+ */
+ void emptied()
+ {
+ mThreadPool->handleEmptied();
+ }
+private:
+ ThreadPool *mThreadPool;
+};
+
+ThreadQueueListenerFactory::ThreadQueueListenerFactory() { }
+
+ThreadQueueListenerPtr ThreadQueueListenerFactory::createThreadQueueListener(ThreadPool* pool)
+{
+ return new ThreadQueueListener(pool);
+}
+
+ThreadPoolFactory::ThreadPoolFactory() { }
+
+PoolPtr ThreadPoolFactory::createPool(const PoolListenerPtr& listener, const QueuePtr& queue)
+{
+ return new ThreadPool(listener, queue);
+}
+
+}; // end namespace ThreadPool
+}; // end namespace Asterisk SCF
diff --git a/ThreadPool/src/WorkerThread.cpp b/ThreadPool/src/WorkerThread.cpp
new file mode 100644
index 0000000..59cef80
--- /dev/null
+++ b/ThreadPool/src/WorkerThread.cpp
@@ -0,0 +1,120 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/thread.hpp>
+
+#include <AsteriskSCF/System/WorkQueue/WorkQueueIf.h>
+#include <AsteriskSCF/WorkerThread.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class WorkerThreadPriv
+{
+public:
+ WorkerThreadPriv(const QueuePtr& workQueue, WorkerThreadListener *listener,
+ WorkerThread *workerThread)
+ : mWakeUp(false), mState(Alive), mListener(listener), mQueue(workQueue), mWorkerThread(workerThread),
+ mThread(boost::bind(&WorkerThreadPriv::active, this)) { }
+
+ void active()
+ {
+ /**
+ * For all intents and purposes, localAlive
+ * should evaluate to (mState == Alive). We use
+ * a local variable here to get around potential
+ * optimiser behaviour that could result in an
+ * infinite loop.
+ */
+ bool localAlive = true;
+ while (localAlive)
+ {
+ if (!mQueue->executeWork())
+ {
+ localAlive = idle();
+ }
+ }
+
+ // Reaching this portion means the thread is
+ // on death's door. It may have been killed while
+ // it was idle, in which case it can just die
+ // peacefully. If it's a zombie, though, then
+ // it needs to let the ThreadPoolImpl know so
+ // that the thread can be removed from the
+ // vector of zombie threads.
+ if (mState == Zombie)
+ {
+ mListener->zombieThreadDead(mWorkerThread);
+ }
+ }
+
+ bool idle()
+ {
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ if (mState != Alive)
+ {
+ return false;
+ }
+ mListener->activeThreadIdle(mWorkerThread);
+ while (!mWakeUp)
+ {
+ mCond.wait(lock);
+ }
+ mWakeUp = false;
+ return mState == Alive;
+ }
+ }
+
+ bool mWakeUp;
+ ThreadState mState;
+ WorkerThreadListener *mListener;
+ QueuePtr mQueue;
+ WorkerThread *mWorkerThread;
+ boost::condition_variable mCond;
+ boost::mutex mLock;
+ boost::thread mThread;
+};
+
+WorkerThread::WorkerThread(const QueuePtr& workQueue, WorkerThreadListener *listener)
+ : mPriv(new WorkerThreadPriv(workQueue, listener, this)) { }
+
+WorkerThread::~WorkerThread()
+{
+ setState(Dead);
+ mPriv->mThread.join();
+}
+
+void WorkerThread::setState(ThreadState newState)
+{
+ boost::unique_lock<boost::mutex> lock(mPriv->mLock);
+ mPriv->mState = newState;
+ mPriv->mWakeUp = true;
+ mPriv->mCond.notify_one();
+}
+
+bool WorkerThread::operator==(const WorkerThread& rhs)
+{
+ return mPriv->mThread == rhs.mPriv->mThread;
+}
+
+}; //end namespace ThreadPool
+}; //end namespace AsteriskSCF
diff --git a/ThreadPool/test/CMakeLists.txt b/ThreadPool/test/CMakeLists.txt
new file mode 100644
index 0000000..e7cc2e1
--- /dev/null
+++ b/ThreadPool/test/CMakeLists.txt
@@ -0,0 +1,37 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_component_init(ThreadPoolTest CXX)
+
+include_directories(${API_INCLUDE_DIR})
+include_directories(../src)
+# The test requires a work queue
+include_directories(../../WorkQueue/include)
+include_directories(../include)
+
+
+asterisk_scf_component_add_file(ThreadPoolTest TestThreadPool.cpp)
+asterisk_scf_component_add_file(ThreadPoolTest test.cpp)
+asterisk_scf_component_add_boost_libraries(ThreadPoolTest unit_test_framework)
+
+# asterisk_scf_component_add_file(ProofOfConcept ProofOfConcept.cpp)
+# asterisk_scf_component_add_file(ProofOfConcept test.cpp)
+# asterisk_scf_component_add_boost_libraries(ProofOfConcept unit_test_framework)
+
+asterisk_scf_component_build_standalone(ThreadPoolTest)
+target_link_libraries(ThreadPoolTest asterisk-scf-api)
+target_link_libraries(ThreadPoolTest ThreadPool)
+target_link_libraries(ThreadPoolTest WorkQueue)
+
+# asterisk_scf_component_build_standalone(ProofOfConcept)
+# target_link_libraries(ProofOfConcept asterisk-scf-api)
+# target_link_libraries(ProofOfConcept ThreadPool)
+# target_link_libraries(ProofOfConcept WorkQueue)
+
+asterisk_scf_test_boost(ThreadPoolTest)
+# asterisk_scf_test_boost(ProofOfConcept)
diff --git a/ThreadPool/test/ProofOfConcept.cpp b/ThreadPool/test/ProofOfConcept.cpp
new file mode 100644
index 0000000..78be2c1
--- /dev/null
+++ b/ThreadPool/test/ProofOfConcept.cpp
@@ -0,0 +1,174 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/SuspendableWorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
+/**
+ * The purpose of this file is to test the use of a
+ * threadpool made up of suspendable queues.
+ */
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::ThreadPool;
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class SimpleTask : public SuspendableWork
+{
+public:
+ SimpleTask() : mTaskCompleted(false) { }
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ std::cout << "Executing a simple task" << std::endl;
+ mTaskCompleted = true;
+ return Complete;
+ }
+ bool mTaskCompleted;
+};
+
+typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
+
+/**
+ * This is the "work" that we queue onto our thread
+ * pool's queue. In actuality, it calls into its own
+ * SuspendableQueue to accomplish work. It listens
+ * to itself to determine how to react.
+ */
+class RelatedWork : public Work, public QueueListener
+{
+public:
+ RelatedWork(const QueuePtr& queue) : mThreadQueue(queue),
+ mInternalQueue(new SuspendableWorkQueue(this)) { }
+
+ void enqueueWork(const SuspendableWorkPtr& work)
+ {
+ std::cout << "Queueing the work item into the internal queue" << std::endl;
+ mInternalQueue->enqueueWork(work);
+ }
+
+ void execute()
+ {
+ std::cout << "Going to execute work now" << std::endl;
+ while (mInternalQueue->executeWork());
+ }
+
+ void workAdded(int, bool wasEmpty)
+ {
+ //When work is added, we can be sure
+ //that we are in one of four states:
+ //1. We are already queued in the ThreadPool's
+ // queue.
+ //2. We are not in the ThreadPool's queue because
+ // we were empty.
+ //3. We are not in the ThreadPool's queue because
+ // we are executing work in our internal queue.
+ //4. We are not in the ThreadPool's queue because
+ // we are waiting on suspended work to be resumable.
+ //
+ //The only case where we actually need to react to
+ //work being added is when we're empty.
+ if (wasEmpty)
+ {
+ std::cout << "Work added to empty internal queue. Adding to thread pool queue" << std::endl;
+ mThreadQueue->enqueueWork(this);
+ }
+ }
+
+ void workResumable()
+ {
+ mThreadQueue->enqueueWork(this);
+ }
+
+ void emptied()
+ {
+ }
+
+ QueuePtr mThreadQueue;
+ SuspendableQueuePtr mInternalQueue;
+};
+
+typedef IceUtil::Handle<RelatedWork> RelatedWorkPtr;
+
+/**
+ * Our listener. He's the one that actually controls the
+ * ThreadPool.
+ */
+class POCListener : public PoolListener
+{
+public:
+ POCListener() : activeThreads(0) {}
+ void stateChanged(const PoolPtr& pool, int active, int idle, int)
+ {
+ //This implementation is not designed to necessarily
+ //be the most efficient, but it is designed to get
+ //rid of as many idle threads as it can.
+ //
+ //If there are any idle threads, get rid of them.
+ if (idle > 0)
+ {
+ std::cout << "Idle threads being killed" << std::endl;
+ pool->setSize(active);
+ }
+ activeThreads = active;
+ }
+
+ void queueWorkAdded(const PoolPtr& pool, int count, bool)
+ {
+ //Uh oh, More work added. Let's add more threads.
+ int newSize = activeThreads + count;
+ std::cout << "Pool detected work added. Increasing pool size to " << newSize << std::endl;
+ pool->setSize(newSize);
+ }
+
+ void queueEmptied(const PoolPtr& pool)
+ {
+ //No point in having a bunch of idle threads
+ //sitting around.
+ std::cout << "Threadpool queue empty. Setting size to 0" << std::endl;
+ pool->setSize(0);
+ }
+ size_t activeThreads;
+};
+
+typedef IceUtil::Handle<POCListener> POCListenerPtr;
+
+BOOST_AUTO_TEST_SUITE(ProofOfConcept)
+
+BOOST_AUTO_TEST_CASE(simpleCase)
+{
+ //This first case is very simple. We just
+ //make sure that queueing a task results in
+ //a thread actually doing work.
+ POCListenerPtr listener(new POCListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ SimpleTaskPtr task(new SimpleTask);
+ RelatedWorkPtr threadWork(new RelatedWork(queue));
+
+ //Now with the way these classes are set up, we should
+ //just be able to enqueue a piece of work to the queue
+ //and have it automatically execute.
+ threadWork->enqueueWork(task);
+ std::sleep(5);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/ThreadPool/test/TestThreadPool.cpp b/ThreadPool/test/TestThreadPool.cpp
new file mode 100644
index 0000000..7d12f54
--- /dev/null
+++ b/ThreadPool/test/TestThreadPool.cpp
@@ -0,0 +1,544 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::ThreadPool;
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class TestListener : public PoolListener
+{
+public:
+ TestListener() : mActive(0), mIdle(0), mZombie(0), mTasks(0),
+ mWorkAddedNotice(false), mWasEmpty(false), mEmptyNotice(false) { }
+
+ void stateChanged(const PoolPtr&, int active, int idle, int zombie)
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ mActive = active;
+ mIdle = idle;
+ mZombie = zombie;
+ mDone.notify_one();
+ }
+
+ void queueWorkAdded(const PoolPtr&, int count, bool wasEmpty)
+ {
+ boost::lock_guard<boost::mutex> lock(mLock);
+ mTasks = count;
+ mWasEmpty = wasEmpty;
+ mWorkAddedNotice = true;
+ mDone.notify_one();
+ }
+
+ void queueEmptied(const PoolPtr&)
+ {
+ boost::lock_guard<boost::mutex> lock(mLock);
+ mEmptyNotice = true;
+ }
+
+ int mActive;
+ int mIdle;
+ int mZombie;
+ int mTasks;
+
+ bool mWorkAddedNotice;
+ bool mWasEmpty;
+ bool mEmptyNotice;
+
+ boost::mutex mLock;
+ boost::condition_variable mDone;
+};
+
+typedef IceUtil::Handle<TestListener> TestListenerPtr;
+
+class SimpleTask : public Work
+{
+public:
+ SimpleTask() : taskExecuted(false) { }
+ void execute()
+ {
+ boost::unique_lock<boost::mutex> lock(mLock);
+ taskExecuted = true;
+ mDone.notify_one();
+ }
+ bool taskExecuted;
+
+ boost::mutex mLock;
+ boost::condition_variable mDone;
+};
+
+typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
+
+class ComplexTask : public Work
+{
+public:
+ ComplexTask() : mContinue(false), taskExecuted(false) { }
+
+ void execute()
+ {
+ //Complex tasks will start their execution
+ //but then halt until they are poked.
+ boost::unique_lock<boost::mutex> lock(mLock);
+ while (!mContinue)
+ {
+ mStall.wait(lock);
+ }
+ //We've been poked, so let's finish up.
+ taskExecuted = true;
+ mDone.notify_one();
+ }
+
+ boost::mutex mLock;
+ boost::condition_variable mStall;
+ boost::condition_variable mDone;
+
+ bool mContinue;
+ bool taskExecuted;
+};
+
+typedef IceUtil::Handle<ComplexTask> ComplexTaskPtr;
+
+#define WAIT_WHILE(condition) \
+{\
+ boost::unique_lock<boost::mutex> lock(listener->mLock);\
+ while ((condition))\
+ {\
+ listener->mDone.wait(lock);\
+ }\
+}\
+
+static void pokeWorker(ComplexTaskPtr& task)
+{
+ boost::unique_lock<boost::mutex> lock(task->mLock);
+ task->mContinue = true;
+ task->mStall.notify_one();
+}
+
+template <class T>
+static void waitForCompletion(T& task)
+{
+ boost::unique_lock<boost::mutex> lock(task->mLock);
+ while (!task->taskExecuted)
+ {
+ task->mDone.wait(lock);
+ }
+}
+
+static void waitForWorkNotice(TestListenerPtr& listener)
+{
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (!listener->mWorkAddedNotice)
+ {
+ listener->mDone.wait(lock);
+ }
+}
+
+static void waitForEmptyNotice(TestListenerPtr& listener)
+{
+ boost::unique_lock<boost::mutex> lock(listener->mLock);
+ while (!listener->mEmptyNotice)
+ {
+ listener->mDone.wait(lock);
+ }
+}
+
+BOOST_AUTO_TEST_SUITE(ThreadPoolTest)
+
+BOOST_AUTO_TEST_CASE(addWork)
+{
+ BOOST_TEST_MESSAGE("Running addWork test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ SimpleTaskPtr work(new SimpleTask());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+
+ queue->enqueueWork(work);
+
+ waitForWorkNotice(listener);
+
+ BOOST_CHECK(listener->mWorkAddedNotice == true);
+ BOOST_CHECK(listener->mWasEmpty == true);
+ BOOST_CHECK(listener->mTasks == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(threadCreation)
+{
+ BOOST_TEST_MESSAGE("Running threadCreation test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+
+ pool->setSize(1);
+
+ //The thread will initially be active but will
+ //turn idle nearly immediately since there is no
+ //work to do.
+ WAIT_WHILE(listener->mIdle == 0);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(threadDestruction)
+{
+ BOOST_TEST_MESSAGE("Running threadDestruction test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+
+ pool->setSize(3);
+
+ WAIT_WHILE(listener->mIdle < 3);
+
+ BOOST_CHECK(listener->mIdle == 3);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ pool->setSize(2);
+
+ WAIT_WHILE(listener->mIdle > 2);
+
+ BOOST_CHECK(listener->mIdle == 2);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(oneTaskOneThread)
+{
+ BOOST_TEST_MESSAGE("Running oneTaskOneThread test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ SimpleTaskPtr work(new SimpleTask());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+
+ queue->enqueueWork(work);
+ pool->setSize(1);
+
+ //The thread should execute the work and then
+ //become idle.
+ waitForCompletion(work);
+
+ BOOST_CHECK(work->taskExecuted == true);
+
+ waitForEmptyNotice(listener);
+ BOOST_CHECK(listener->mEmptyNotice == true);
+
+ //The thread should be idle now. Let's make sure
+ //that's happening.
+ WAIT_WHILE(listener->mIdle == 0);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(oneThreadOneTask)
+{
+ BOOST_TEST_MESSAGE("Running oneThreadOneTask test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ SimpleTaskPtr work(new SimpleTask());
+
+ pool->setSize(1);
+
+ WAIT_WHILE(listener->mIdle < 1);
+
+ //The thread is idle now. When we queue work, it should
+ //become active and execute the work.
+ queue->enqueueWork(work);
+
+ waitForCompletion(work);
+
+ BOOST_CHECK(work->taskExecuted == true);
+
+ waitForEmptyNotice(listener);
+ BOOST_CHECK(listener->mEmptyNotice == true);
+
+ //And of course, the thread should become idle once work is done
+ WAIT_WHILE(listener->mIdle < 1);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(oneThreadMultipleTasks)
+{
+ BOOST_TEST_MESSAGE("Running oneThreadMultipleTasks test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ SimpleTaskPtr work1(new SimpleTask());
+ SimpleTaskPtr work2(new SimpleTask());
+ SimpleTaskPtr work3(new SimpleTask());
+ WorkSeq works;
+
+ works.push_back(work1);
+ works.push_back(work2);
+ works.push_back(work3);
+
+ queue->enqueueWorkSeq(works);
+
+ pool->setSize(1);
+ //The single thread should execute all three tests. We've
+ //ensured in our queue tests that execution happens in the
+ //correct order, so we just need to wait for the third task
+ //to be complete.
+ waitForCompletion(work3);
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+ BOOST_CHECK(work3->taskExecuted == true);
+
+ //And of course, the thread should become idle once work is done
+ WAIT_WHILE(listener->mIdle < 1);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(reactivation)
+{
+ BOOST_TEST_MESSAGE("Running reactivation test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ SimpleTaskPtr work1(new SimpleTask());
+
+ queue->enqueueWork(work1);
+
+ pool->setSize(1);
+
+ waitForCompletion(work1);
+
+ BOOST_CHECK(work1->taskExecuted == true);
+
+ WAIT_WHILE(listener->mIdle < 1);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ //This is the point of the test. Make sure that
+ //the idle thread is reactivated when new work
+ //is queued.
+ SimpleTaskPtr work2(new SimpleTask());
+
+ queue->enqueueWork(work2);
+
+ waitForCompletion(work2);
+
+ BOOST_CHECK(work2->taskExecuted == true);
+
+ WAIT_WHILE(listener->mIdle < 1);
+
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(taskDistribution)
+{
+ BOOST_TEST_MESSAGE("Running taskDistribution test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ ComplexTaskPtr work1(new ComplexTask());
+ ComplexTaskPtr work2(new ComplexTask());
+ WorkSeq works;
+
+ works.push_back(work1);
+ works.push_back(work2);
+
+ queue->enqueueWorkSeq(works);
+
+ pool->setSize(2);
+
+ //Since these tasks halt until they are poked,
+ //The two tasks should be evenly divided amongst
+ //the threads.
+ //
+ //Pool operations are asynchronous, so we need to
+ //wait until the listener is informed that there are
+ //two active threads
+ WAIT_WHILE(listener->mActive < 2);
+
+ BOOST_CHECK(listener->mActive == 2);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ //Cool, so let's give those threads a poke
+ pokeWorker(work1);
+ pokeWorker(work2);
+
+ //Now be sure the tasks finish
+ waitForCompletion(work1);
+ waitForCompletion(work2);
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+
+ //And of course, the threads should become idle once work is done
+ WAIT_WHILE(listener->mIdle < 2);
+
+ BOOST_CHECK(listener->mIdle == 2);
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(zombies)
+{
+ BOOST_TEST_MESSAGE("Running zombies test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ ComplexTaskPtr work1(new ComplexTask());
+ ComplexTaskPtr work2(new ComplexTask());
+ WorkSeq works;
+
+ works.push_back(work1);
+ works.push_back(work2);
+
+ queue->enqueueWorkSeq(works);
+
+ pool->setSize(2);
+
+ WAIT_WHILE(listener->mActive < 2);
+
+ BOOST_CHECK(listener->mActive == 2);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ //Now we'll set the size down to 0. This should
+ //result in the active threads being turned to
+ //zombies
+ pool->setSize(0);
+
+ WAIT_WHILE(listener->mZombie < 2);
+
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 2);
+
+ //Now we should still be able to poke the work and
+ //have it complete executing.
+ pokeWorker(work1);
+ pokeWorker(work2);
+
+ //Now be sure the tasks finish
+ waitForCompletion(work1);
+ waitForCompletion(work2);
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+
+ //Since the tasks finished executing, the zombie
+ //threads should die.
+ WAIT_WHILE(listener->mZombie > 0);
+
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_CASE(moreThreadDestruction)
+{
+ BOOST_TEST_MESSAGE("Running moreThreadDestruction test");
+
+ TestListenerPtr listener(new TestListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ ComplexTaskPtr work1(new ComplexTask());
+ ComplexTaskPtr work2(new ComplexTask());
+ WorkSeq works;
+
+ works.push_back(work1);
+ works.push_back(work2);
+
+ queue->enqueueWorkSeq(works);
+
+ pool->setSize(4);
+
+ // All threads start as active, but 2 should become
+ // idle nearly immediately. We don't want to proceed
+ // until we know the two threads have become idle.
+ WAIT_WHILE(listener->mIdle < 2);
+
+ BOOST_CHECK(listener->mActive == 2);
+ BOOST_CHECK(listener->mIdle == 2);
+ BOOST_CHECK(listener->mZombie == 0);
+
+ pool->setSize(1);
+
+ //Previous state was 2 active and 2 idle threads.
+ //Removing 3 threads should kill the 2 idle threads
+ //and change one of the active threads to a zombie.
+ WAIT_WHILE(listener->mIdle > 0 || listener->mZombie == 0);
+
+ BOOST_CHECK(listener->mActive == 1);
+ BOOST_CHECK(listener->mIdle == 0);
+ BOOST_CHECK(listener->mZombie == 1);
+
+ pokeWorker(work1);
+ pokeWorker(work2);
+
+ waitForCompletion(work1);
+ waitForCompletion(work2);
+
+ BOOST_CHECK(work1->taskExecuted == true);
+ BOOST_CHECK(work2->taskExecuted == true);
+
+ WAIT_WHILE(listener->mZombie > 0 || listener->mActive > 0);
+
+ BOOST_CHECK(listener->mActive == 0);
+ BOOST_CHECK(listener->mIdle == 1);
+ BOOST_CHECK(listener->mZombie == 0);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/ThreadPool/test/test.cpp b/ThreadPool/test/test.cpp
new file mode 100644
index 0000000..96c31c2
... 1835 lines suppressed ...
--
asterisk-scf/release/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list