[asterisk-scf-commits] asterisk-scf/integration/ice-util-cpp.git branch "workqueue" updated.

Commits to the Asterisk SCF project code repositories asterisk-scf-commits at lists.digium.com
Fri Mar 25 09:35:47 CDT 2011


branch "workqueue" has been updated
       via  894bfb514ff55ea84a7ac6294a037a579b45292e (commit)
      from  77522e48b1b06b4b03c38c64b900f1459ead5c07 (commit)

Summary of changes:
 {WorkQueue => ThreadPool}/CMakeLists.txt    |    0
 ThreadPool/include/AsteriskSCF/ThreadPool.h |   60 +++++++
 ThreadPool/src/CMakeLists.txt               |   23 +++
 ThreadPool/src/ThreadPool.cpp               |  239 +++++++++++++++++++++++++++
 4 files changed, 322 insertions(+), 0 deletions(-)
 copy {WorkQueue => ThreadPool}/CMakeLists.txt (100%)
 create mode 100644 ThreadPool/include/AsteriskSCF/ThreadPool.h
 create mode 100644 ThreadPool/src/CMakeLists.txt
 create mode 100644 ThreadPool/src/ThreadPool.cpp


- Log -----------------------------------------------------------------
commit 894bfb514ff55ea84a7ac6294a037a579b45292e
Author: Mark Michelson <mmichelson at digium.com>
Date:   Fri Mar 25 09:34:49 2011 -0500

    Adding some initial code for ThreadPool implementation.
    
    It likely doesn't even compile. I'm just pushing it so that I can
    get to it when I work on it later over the weekend.

diff --git a/ThreadPool/CMakeLists.txt b/ThreadPool/CMakeLists.txt
new file mode 100644
index 0000000..b1a0931
--- /dev/null
+++ b/ThreadPool/CMakeLists.txt
@@ -0,0 +1,10 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+add_subdirectory(src)
+add_subdirectory(test)
diff --git a/ThreadPool/include/AsteriskSCF/ThreadPool.h b/ThreadPool/include/AsteriskSCF/ThreadPool.h
new file mode 100644
index 0000000..ebe3135
--- /dev/null
+++ b/ThreadPool/include/AsteriskSCF/ThreadPool.h
@@ -0,0 +1,60 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+#pragma once
+
+#include <boost/shared_ptr.hpp>
+
+#include <AsteriskSCF/System/ThreadPool/ThreadPoolIf.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+class WorkerThread;
+
+class ThreadPool : public AsteriskSCF::System::ThreadPool::V1::Pool, public AsteriskSCF::System::WorkQueue::V1::QueueListener
+{
+public:
+    ThreadPool();
+    ThreadPool(int size);
+    void setSize(::Ice::Int);
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr getQueue();
+private:
+    void grow(int numNewThreads);
+    void shrink(int threadsToKill);
+    void activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread);
+    void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread);
+
+    AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr mListener;
+    AsteriskSCF::System::WorkQueue::V1::QueuePtr mQueue;
+    
+    std::vector<boost::shared_ptr<WorkerThread> > mActiveThreads;
+    std::vector<boost::shared_ptr<WorkerThread> > mIdleThreads;
+    std::vector<boost::shared_ptr<WorkerThread> > mZombieThreads;
+};
+
+class ThreadPoolFactory : public AsteriskSCF::System::ThreadPool::V1::PoolFactory
+{
+public:
+    ThreadPoolFactory();
+    AsteriskSCF::System::ThreadPool::V1::PoolPtr createPool(const AsteriskSCF::System::ThreadPool::V1::PoolListenerPtr& listener,
+            const AsteriskSCF::System::WorkQueue::V1::QueuePtr& queue);
+private:
+};
+
+}; // end namespace ThreadPool
+}; // end namespace AsteriskSCF
diff --git a/ThreadPool/src/CMakeLists.txt b/ThreadPool/src/CMakeLists.txt
new file mode 100644
index 0000000..d26144c
--- /dev/null
+++ b/ThreadPool/src/CMakeLists.txt
@@ -0,0 +1,23 @@
+#
+# Asterisk Scalable Communications Framework
+#
+# Copyright (C) 2011 -- Digium, Inc.
+#
+# All rights reserved.
+#
+
+asterisk_scf_slice_include_directories(${API_SLICE_DIR})
+
+asterisk_scf_component_init(ThreadPool CXX)
+
+include_directories(../include)
+include_directories(${API_INCLUDE_DIR})
+
+asterisk_scf_component_add_file(ThreadPool
+    ../include/AsteriskSCF/ThreadPool.h)
+asterisk_scf_component_add_file(ThreadPool ThreadPool.cpp)
+asterisk_scf_component_add_boost_libraries(ThreadPool thread)
+
+asterisk_scf_component_build_library(ThreadPool)
+
+asterisk_scf_headers_install(../include/)
diff --git a/ThreadPool/src/ThreadPool.cpp b/ThreadPool/src/ThreadPool.cpp
new file mode 100644
index 0000000..b64d705
--- /dev/null
+++ b/ThreadPool/src/ThreadPool.cpp
@@ -0,0 +1,239 @@
+/*
+ * Asterisk SCF -- An open-source communications framework.
+ *
+ * Copyright (C) 2011, Digium, Inc.
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk SCF project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE.txt file
+ * at the top of the source tree.
+ */
+
+#include <AsteriskSCF/ThreadPool.h>
+#include <AsteriskSCF/WorkQueue.h>
+
+namespace AsteriskSCF
+{
+namespace ThreadPool
+{
+
+using namespace AsteriskSCF::System::ThreadPool::V1;
+using namespace AsteriskSCF::System::WorkQueue::V1;
+
+class WorkerThread
+{
+public:
+    WorkerThread(const QueuePtr& workQueue)
+        : mQueue(workQueue), mState(Active), mThread(boost::bind(&WorkerThread::active, this)) { }
+
+    void active()
+    {
+        while (mState == Active)
+        {
+            if (!mQueue->executeWork())
+            {
+                idle();
+            }
+        }
+
+        // Reaching this portion means the thread is
+        // on death's door. It may have been killed while
+        // it was idle, in which case it can just die
+        // peacefully. If it's a zombie, though, then
+        // it needs to let the ThreadPoolImpl know so
+        // that the thread can be removed from the
+        // vector of zombie threads.
+        if (mState == Zombie)
+        {
+            mPool->zombieThreadDead(this);
+        }
+    }
+
+    void idle()
+    {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        // If we've been turned into a zombie while we
+        // were active, then just go ahead and return.
+        if (mState == Zombie)
+        {
+            return;
+        }
+
+        // Otherwise, we'll set ourselves idle and wait
+        // for a poke
+        mState = Idle;
+        mPool->activeThreadIdle(this);
+        mCond.wait(lock);
+    }
+
+    void poke(ThreadState newState)
+    {
+        boost::unique_lock<boost::mutex> lock(mLock);
+        mState = newState;
+        mCond.notify_one();
+    }
+
+    enum ThreadState
+    {
+        /**
+         * Actively doing work
+         */
+        Active,
+        /**
+         * Nothing to do; waiting to be poked
+         */
+        Idle,
+        /**
+         * Marked for deletion. May still be executing
+         * code but next time it checks its state, it
+         * will die.
+         *
+         * Basically what happens when we try to kill
+         * a thread when it is Active
+         */
+        Zombie,
+        /**
+         * The ThreadPoolImpl considers this thread to
+         * be gone. The thread just needs to get out of
+         * the way ASAP.
+         *
+         * Basically what happens when we try to kill
+         * a thread when it is Idle
+         */
+        Dead
+    } mState;
+
+    QueuePtr mQueue;
+    boost::thread mThread;
+    boost::condition_variable mCond;
+    boost::mutex mLock;
+};
+
+void ThreadPool::grow(int numNewThreads)
+{
+    for (int i = 0; i < numNewThreads; ++i)
+    {
+        boost::shared_ptr<WorkerThread> newThread(new WorkerThread(mQueue));
+        mActiveThreads.push_back(newThread);
+    }
+}
+
+void ThreadPool::shrink(int threadsToKill)
+{
+    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+            i != mIdleThreads.end(); ++i)
+    {
+        mIdleThreads.erase(i);
+        (*i)->poke(WorkerThread::Dead);
+
+        if (--threadsToKill == 0)
+        {
+            return;
+        }
+    }
+
+    // If we've made it here, then it means that there weren't enough idle
+    // threads to kill. We'll need to zombify some active threads then.
+    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mActiveThreads.begin();
+            i != mThreads.end(); ++i)
+    {
+        //Active threads, on the other hand, need to at least temporarily be
+        //pushed into the zombie container.
+        mZombieThreads.push_back(*i);
+        mActiveThreads.erase(i);
+        (*i)->poke(WorkerThread::Zombie);
+
+        if (--threadsToKill == 0)
+        {
+            return;
+        }
+    }
+}
+
+void ThreadPool::activeThreadIdle(const boost::shared_ptr<WorkerThread> &thread)
+{
+    std::vector<boost::shared_ptr<WorkerThread> >::iterator iter =
+        std::find(mActiveThreads.begin(), mActiveThreads.end(), thread);
+
+    if (iter != mActiveThreads.end())
+    {
+        mIdleThreads.push_back(*iter);
+        mActiveThreads.erase(iter);
+    }
+    mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+void ThreadPool::zombieThreadDead(const boost::shared_ptr<WorkerThread> &thread)
+{
+    mZombieThreads.erase(std::find(mActiveThreads.begin(), mActiveThreads.end(), thread));
+    mListener->stateChanged(mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+ThreadPool::ThreadPool(const ThreadPoolListenerPtr &listener, const QueuePtr &queue)
+    : mQueue(queue), mListener(listener), mQueueListener(new ThreadPoolQueueListener(this))
+{
+    queue->setListener(mQueue);
+}
+
+void ThreadPool::setSize(int size)
+{
+    int currentSize = mActiveThreads.size() + mIdleThreads.size() + mZombieThreads.size();
+    if (size < currentSize)
+    {
+        shrink(currentSize - size);
+    }
+    else
+    {
+        grow(size - currentSize);
+    }
+    mListener->stateChanged(this, mActiveThreads.size(), mIdleThreads.size(), mZombieThreads.size());
+}
+
+QueuePtr ThreadPool::getQueue()
+{
+    return mQueue;
+}
+
+//QueueListener overloads
+
+void ThreadPool::workAdded(bool wasEmpty)
+{
+    // XXX The interface for PoolListener says that the second parameter of this
+    // method should be the amount of new items added. Well, that's just a pain to
+    // try to keep up with and report accurately. For now, I'm just going to report
+    // the total work count.
+    //
+    // If we want to report the amount of new work, then I would strongly suggest
+    // changing the QueueListener's workAdded method to include the amount of new
+    // work added.
+    mListener->queueWorkAdded(this, mQueue->workCount(), wasEmpty);
+    // XXX This could potentially stand to be more sophisticated, but poking all the
+    // idle threads will get the job done too.
+    //
+    // A potential alternative would be to poke a number of idle threads equal to the
+    // new work count.
+    for (std::vector<boost::shared_ptr<WorkerThread> >::iterator i = mIdleThreads.begin();
+            i != mIdleThreads.end(); ++i)
+    {
+        (*i)->poke(WorkerThread::Active);
+    }
+}
+void ThreadPool::workResumable()
+{
+    /* This should never be called since we use a standard
+     * work queue in the thread pool.
+     */
+    assert(false);
+}
+void ThreadPool::emptied()
+{
+    mListener->queueEmptied(this)
+}
+
+}; // end namespace ThreadPool
+}; // end namespace Asterisk SCF

-----------------------------------------------------------------------


-- 
asterisk-scf/integration/ice-util-cpp.git



More information about the asterisk-scf-commits mailing list