[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Wed May 4 14:11:43 CDT 2011


branch "workqueue" has been updated
       via  f7ff93946394a908997595fe8076a2496602a01f (commit)
      from  abbd30770385cefed7a7c5490713dc8763476c41 (commit)

Summary of changes:
 ThreadPool/test/CMakeLists.txt     |   10 ++
 ThreadPool/test/ProofOfConcept.cpp |  174 ++++++++++++++++++++++++++++++++++++
 2 files changed, 184 insertions(+), 0 deletions(-)
 create mode 100644 ThreadPool/test/ProofOfConcept.cpp


- Log -----------------------------------------------------------------
commit f7ff93946394a908997595fe8076a2496602a01f
Author: Mark Michelson <mmichelson at digium.com>
Date:   Wed May 4 14:11:18 2011 -0500

    Add initial proof of concept thread pool test before I forget about it.

diff --git a/ThreadPool/test/CMakeLists.txt b/ThreadPool/test/CMakeLists.txt
index 98172d0..eb84917 100644
--- a/ThreadPool/test/CMakeLists.txt
+++ b/ThreadPool/test/CMakeLists.txt
@@ -19,9 +19,19 @@ asterisk_scf_component_add_file(ThreadPoolTest TestThreadPool.cpp)
 asterisk_scf_component_add_file(ThreadPoolTest test.cpp)
 asterisk_scf_component_add_boost_libraries(ThreadPoolTest unit_test_framework)
 
+asterisk_scf_component_add_file(ProofOfConcept ProofOfConcept.cpp)
+asterisk_scf_component_add_file(ProofOfConcept test.cpp)
+asterisk_scf_component_add_boost_libraries(ProofOfConcept unit_test_framework)
+
 asterisk_scf_component_build_standalone(ThreadPoolTest)
 target_link_libraries(ThreadPoolTest asterisk-scf-api)
 target_link_libraries(ThreadPoolTest ThreadPool)
 target_link_libraries(ThreadPoolTest WorkQueue)
 
+asterisk_scf_component_build_standalone(ProofOfConcept)
+target_link_libraries(ProofOfConcept asterisk-scf-api)
+target_link_libraries(ProofOfConcept ThreadPool)
+target_link_libraries(ProofOfConcept WorkQueue)
+
 asterisk_scf_test_boost(ThreadPoolTest)
+asterisk_scf_test_boost(ProofOfConcept)
diff --git a/ThreadPool/test/ProofOfConcept.cpp b/ThreadPool/test/ProofOfConcept.cpp
new file mode 100644
index 0000000..a19af82
--- /dev/null
+++ b/ThreadPool/test/ProofOfConcept.cpp
@@ -0,0 +1,174 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <boost/test/unit_test.hpp>
+
+#include <AsteriskSCF/WorkQueue.h>
+#include <AsteriskSCF/SuspendableWorkQueue.h>
+#include <AsteriskSCF/ThreadPool.h>
+
+/**
+ * The purpose of this file is to test the use of a 
+ * threadpool made up of suspendable queues.
+ */
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::ThreadPool;
+
+using namespace AsteriskSCF::System::WorkQueue::V1;
+using namespace AsteriskSCF::WorkQueue;
+
+class SimpleTask : public SuspendableWork
+{
+public:
+    SimpleTask() : mTaskCompleted(false) { }
+    SuspendableWorkResult execute(const SuspendableWorkListenerPtr&)
+    {
+        std::cout << "Executing a simple task" << std::endl;
+        mTaskCompleted = true;
+        return Complete;
+    }
+    bool mTaskCompleted;
+};
+
+typedef IceUtil::Handle<SimpleTask> SimpleTaskPtr;
+
+/**
+ * This is the "work" that we queue onto our thread
+ * pool's queue. In actuality, it calls into its own
+ * SuspendableQueue to accomplish work. It listens
+ * to itself to determine how to react.
+ */
+class RelatedWork : public Work, public QueueListener
+{
+public:
+    RelatedWork(const QueuePtr& queue) : mThreadQueue(queue),
+        mInternalQueue(new SuspendableWorkQueue(this)) { }
+
+    void enqueueWork(const SuspendableWorkPtr& work)
+    {
+        std::cout << "Queueing the work item into the internal queue" << std::endl;
+        mInternalQueue->enqueueWork(work);
+    }
+
+    void execute()
+    {
+        std::cout << "Going to execute work now" << std::endl;
+        while (mInternalQueue->executeWork());
+    }
+
+    void workAdded(int, bool wasEmpty)
+    {
+        //When work is added, we can be sure
+        //that we are in one of four states:
+        //1. We are already queued in the ThreadPool's
+        //   queue.
+        //2. We are not in the ThreadPool's queue because
+        //   we were empty.
+        //3. We are not in the ThreadPool's queue because
+        //   we are executing work in our internal queue.
+        //4. We are not in the ThreadPool's queue because
+        //   we are waiting on suspended work to be resumable.
+        //
+        //The only case where we actually need to react to
+        //work being added is when we're empty.
+        if (wasEmpty)
+        {
+            std::cout << "Work added to empty internal queue. Adding to thread pool queue" << std::endl;
+            mThreadQueue->enqueueWork(this);
+        }
+    }
+
+    void workResumable()
+    {
+        mThreadQueue->enqueueWork(this);
+    }
+
+    void emptied()
+    {
+    }
+
+    QueuePtr mThreadQueue;
+    SuspendableQueuePtr mInternalQueue;
+};
+
+typedef IceUtil::Handle<RelatedWork> RelatedWorkPtr;
+
+/**
+ * Our listener. He's the one that actually controls the
+ * ThreadPool.
+ */
+class POCListener : public PoolListener
+{
+public:
+    POCListener() : activeThreads(0) {}
+    void stateChanged(const PoolPtr& pool, int active, int idle, int)
+    {
+        //This implementation is not designed to necessarily
+        //be the most efficient, but it is designed to get
+        //rid of as many idle threads as it can.
+        //
+        //If there are any idle threads, get rid of them.
+        if (idle > 0)
+        {
+            std::cout << "Idle threads being killed" << std::endl;
+            pool->setSize(active);
+        }
+        activeThreads = active;
+    }
+
+    void queueWorkAdded(const PoolPtr& pool, int count, bool)
+    {
+        //Uh oh, More work added. Let's add more threads.
+        int newSize = activeThreads + count;
+        std::cout << "Pool detected work added. Increasing pool size to " << newSize << std::endl;
+        pool->setSize(newSize);
+    }
+
+    void queueEmptied(const PoolPtr& pool)
+    {
+        //No point in having a bunch of idle threads
+        //sitting around.
+        std::cout << "Threadpool queue empty. Setting size to 0" << std::endl;
+        pool->setSize(0);
+    }
+    size_t activeThreads;
+};
+
+typedef IceUtil::Handle<POCListener> POCListenerPtr;
+
+BOOST_AUTO_TEST_SUITE(ProofOfConcept)
+
+BOOST_AUTO_TEST_CASE(simpleCase)
+{
+    //This first case is very simple. We just
+    //make sure that queueing a task results in
+    //a thread actually doing work.
+    POCListenerPtr listener(new POCListener);
+    QueuePtr queue(new WorkQueue());
+    ThreadPoolFactoryPtr factory(new ThreadPoolFactory);
+    PoolPtr pool = factory->createPool(listener, queue);
+    SimpleTaskPtr task(new SimpleTask);
+    RelatedWorkPtr threadWork(new RelatedWork(queue));
+
+    //Now with the way these classes are set up, we should
+    //just be able to enqueue a piece of work to the queue
+    //and have it automatically execute.
+    threadWork->enqueueWork(task);
+    sleep(5);
+}
+
+BOOST_AUTO_TEST_SUITE_END()

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list