[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.
Commits to the Asterisk SCF project code repositories
asterisk-scf-commits at lists.digium.com
Wed May 4 14:11:43 CDT 2011
branch "workqueue" has been updated
via f7ff93946394a908997595fe8076a2496602a01f (commit)
from abbd30770385cefed7a7c5490713dc8763476c41 (commit)
Summary of changes:
ThreadPool/test/CMakeLists.txt | 10 ++
ThreadPool/test/ProofOfConcept.cpp | 174 ++++++++++++++++++++++++++++++++++++
2 files changed, 184 insertions(+), 0 deletions(-)
create mode 100644 ThreadPool/test/ProofOfConcept.cpp
- Log -----------------------------------------------------------------
commit f7ff93946394a908997595fe8076a2496602a01f
Author: Mark Michelson <mmichelson at digium.com>
Date: Wed May 4 14:11:18 2011 -0500
Add initial proof of concept thread pool test before I forget about it.
diff --git a/ThreadPool/test/CMakeLists.txt b/ThreadPool/test/CMakeLists.txt
index 98172d0..eb84917 100644
--- a/ThreadPool/test/CMakeLists.txt
+++ b/ThreadPool/test/CMakeLists.txt
@@ -19,9 +19,19 @@ asterisk_scf_component_add_file(ThreadPoolTest TestThreadPool.cpp)
asterisk_scf_component_add_file(ThreadPoolTest test.cpp)
asterisk_scf_component_add_boost_libraries(ThreadPoolTest unit_test_framework)
+asterisk_scf_component_add_file(ProofOfConcept ProofOfConcept.cpp)
+asterisk_scf_component_add_file(ProofOfConcept test.cpp)
+asterisk_scf_component_add_boost_libraries(ProofOfConcept unit_test_framework)
+
asterisk_scf_component_build_standalone(ThreadPoolTest)
target_link_libraries(ThreadPoolTest asterisk-scf-api)
target_link_libraries(ThreadPoolTest ThreadPool)
target_link_libraries(ThreadPoolTest WorkQueue)
+asterisk_scf_component_build_standalone(ProofOfConcept)
+target_link_libraries(ProofOfConcept asterisk-scf-api)
+target_link_libraries(ProofOfConcept ThreadPool)
+target_link_libraries(ProofOfConcept WorkQueue)
+
asterisk_scf_test_boost(ThreadPoolTest)
+asterisk_scf_test_boost(ProofOfConcept)
diff --git a/ThreadPool/test/ProofOfConcept.cpp b/ThreadPool/test/ProofOfConcept.cpp
new file mode 100644
index 0000000..a19af82
--- /dev/null
+++ b/ThreadPool/test/ProofOfConcept.cpp
@@ -0,0 +1,174 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/SuspendableWorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
+/**
+ * The purpose of this file is to test the use of a
+ * threadpool made up of suspendable queues.
+ */
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::ThreadPool;
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class SimpleTask : public SuspendableWork
+{
+public:
+ SimpleTask() : mTaskCompleted(false) { }
+ SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+ {
+ std::cout << "Executing a simple task" << std::endl;
+ mTaskCompleted = true;
+ return Complete;
+ }
+ bool mTaskCompleted;
+};
+
+typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
+
+/**
+ * This is the "work" that we queue onto our thread
+ * pool's queue. In actuality, it calls into its own
+ * SuspendableQueue to accomplish work. It listens
+ * to itself to determine how to react.
+ */
+class RelatedWork : public Work, public QueueListener
+{
+public:
+ RelatedWork(const QueuePtr& queue) : mThreadQueue(queue),
+ mInternalQueue(new SuspendableWorkQueue(this)) { }
+
+ void enqueueWork(const SuspendableWorkPtr& work)
+ {
+ std::cout << "Queueing the work item into the internal queue" << std::endl;
+ mInternalQueue->enqueueWork(work);
+ }
+
+ void execute()
+ {
+ std::cout << "Going to execute work now" << std::endl;
+ while (mInternalQueue->executeWork());
+ }
+
+ void workAdded(int, bool wasEmpty)
+ {
+ //When work is added, we can be sure
+ //that we are in one of four states:
+ //1. We are already queued in the ThreadPool's
+ // queue.
+ //2. We are not in the ThreadPool's queue because
+ // we were empty.
+ //3. We are not in the ThreadPool's queue because
+ // we are executing work in our internal queue.
+ //4. We are not in the ThreadPool's queue because
+ // we are waiting on suspended work to be resumable.
+ //
+ //The only case where we actually need to react to
+ //work being added is when we're empty.
+ if (wasEmpty)
+ {
+ std::cout << "Work added to empty internal queue. Adding to thread pool queue" << std::endl;
+ mThreadQueue->enqueueWork(this);
+ }
+ }
+
+ void workResumable()
+ {
+ mThreadQueue->enqueueWork(this);
+ }
+
+ void emptied()
+ {
+ }
+
+ QueuePtr mThreadQueue;
+ SuspendableQueuePtr mInternalQueue;
+};
+
+typedef IceUtil::Handle<RelatedWork> RelatedWorkPtr;
+
+/**
+ * Our listener. He's the one that actually controls the
+ * ThreadPool.
+ */
+class POCListener : public PoolListener
+{
+public:
+ POCListener() : activeThreads(0) {}
+ void stateChanged(const PoolPtr& pool, int active, int idle, int)
+ {
+ //This implementation is not designed to necessarily
+ //be the most efficient, but it is designed to get
+ //rid of as many idle threads as it can.
+ //
+ //If there are any idle threads, get rid of them.
+ if (idle > 0)
+ {
+ std::cout << "Idle threads being killed" << std::endl;
+ pool->setSize(active);
+ }
+ activeThreads = active;
+ }
+
+ void queueWorkAdded(const PoolPtr& pool, int count, bool)
+ {
+ //Uh oh, More work added. Let's add more threads.
+ int newSize = activeThreads + count;
+ std::cout << "Pool detected work added. Increasing pool size to " << newSize << std::endl;
+ pool->setSize(newSize);
+ }
+
+ void queueEmptied(const PoolPtr& pool)
+ {
+ //No point in having a bunch of idle threads
+ //sitting around.
+ std::cout << "Threadpool queue empty. Setting size to 0" << std::endl;
+ pool->setSize(0);
+ }
+ size_t activeThreads;
+};
+
+typedef IceUtil::Handle<POCListener> POCListenerPtr;
+
+BOOST_AUTO_TEST_SUITE(ProofOfConcept)
+
+BOOST_AUTO_TEST_CASE(simpleCase)
+{
+ //This first case is very simple. We just
+ //make sure that queueing a task results in
+ //a thread actually doing work.
+ POCListenerPtr listener(new POCListener);
+ QueuePtr queue(new WorkQueue());
+ ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+ PoolPtr pool = factory->createPool(listener, queue);
+ SimpleTaskPtr task(new SimpleTask);
+ RelatedWorkPtr threadWork(new RelatedWork(queue));
+
+ //Now with the way these classes are set up, we should
+ //just be able to enqueue a piece of work to the queue
+ //and have it automatically execute.
+ threadWork->enqueueWork(task);
+ sleep(5);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
-----------------------------------------------------------------------
--
asterisk-scf/integration/ice-util-cpp.git
More information about the asterisk-scf-commits
mailing list